Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[native] Share HTTP connection pools between HTTP clients #21367

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ PeriodicServiceInventoryManager::PeriodicServiceInventoryManager(

void PeriodicServiceInventoryManager::start() {
eventBaseThread_.start(id_);
sessionPool_ = std::make_unique<proxygen::SessionPool>(nullptr, 10);
stopped_ = false;
auto* eventBase = eventBaseThread_.getEventBase();
eventBase->runOnDestruction([this] { client_.reset(); });
eventBase->runOnDestruction([this] { sessionPool_.reset(); });
eventBase->schedule([this]() { return sendRequest(); });
}

Expand Down Expand Up @@ -66,6 +67,7 @@ void PeriodicServiceInventoryManager::sendRequest() {
std::swap(serviceAddress_, newAddress);
client_ = std::make_shared<http::HttpClient>(
eventBaseThread_.getEventBase(),
sessionPool_.get(),
serviceAddress_,
std::chrono::milliseconds(10'000),
std::chrono::milliseconds(0),
Expand Down Expand Up @@ -137,4 +139,4 @@ void PeriodicServiceInventoryManager::scheduleNext() {
std::chrono::steady_clock::now() +
std::chrono::milliseconds(getDelayMs()));
}
} // namespace facebook::presto
} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class PeriodicServiceInventoryManager {
const double backOffjitterParam_{0.1};

folly::EventBaseThread eventBaseThread_;
std::unique_ptr<proxygen::SessionPool> sessionPool_;
folly::SocketAddress serviceAddress_;
std::shared_ptr<http::HttpClient> client_;
std::atomic_bool stopped_{true};
Expand Down
94 changes: 68 additions & 26 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ PrestoExchangeSource::PrestoExchangeSource(
const std::shared_ptr<exec::ExchangeQueue>& queue,
memory::MemoryPool* pool,
folly::CPUThreadPoolExecutor* driverExecutor,
folly::IOThreadPoolExecutor* httpExecutor,
folly::EventBase* ioEventBase,
proxygen::SessionPool* sessionPool,
const std::string& clientCertAndKeyPath,
const std::string& ciphers)
: ExchangeSource(extractTaskId(baseUri.path()), destination, queue, pool),
Expand All @@ -85,8 +86,7 @@ PrestoExchangeSource::PrestoExchangeSource(
ciphers_(ciphers),
immediateBufferTransfer_(
SystemConfig::instance()->exchangeImmediateBufferTransfer()),
driverExecutor_(driverExecutor),
httpExecutor_(httpExecutor) {
driverExecutor_(driverExecutor) {
folly::SocketAddress address;
if (folly::IPAddress::validate(host_)) {
address = folly::SocketAddress(folly::IPAddress(host_), port_);
Expand All @@ -100,11 +100,11 @@ PrestoExchangeSource::PrestoExchangeSource(
std::chrono::duration_cast<std::chrono::milliseconds>(
SystemConfig::instance()->exchangeConnectTimeoutMs());
VELOX_CHECK_NOT_NULL(driverExecutor_);
VELOX_CHECK_NOT_NULL(httpExecutor_);
VELOX_CHECK_NOT_NULL(ioEventBase);
VELOX_CHECK_NOT_NULL(pool_);
auto* ioEventBase = httpExecutor_->getEventBase();
httpClient_ = std::make_shared<http::HttpClient>(
ioEventBase,
sessionPool,
address,
requestTimeoutMs,
connectTimeoutMs,
Expand Down Expand Up @@ -338,24 +338,25 @@ void PrestoExchangeSource::processDataError(
const std::string& path,
uint32_t maxBytes,
uint32_t maxWaitSeconds,
const std::string& error,
bool retry) {
const std::string& error) {
++failedAttempts_;
if (retry && !dataRequestRetryState_.isExhausted()) {
if (!dataRequestRetryState_.isExhausted()) {
VLOG(1) << "Failed to fetch data from " << host_ << ":" << port_ << " "
<< path << " - Retrying: " << error;
<< path << ", duration: " << dataRequestRetryState_.durationMs()
<< "ms - Retrying: " << error;

doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWaitSeconds);
return;
}

onFinalFailure(
fmt::format(
"Failed to fetch data from {}:{} {} - Exhausted after {} retries: {}",
"Failed to fetch data from {}:{} {} - Exhausted after {} retries, duration {}ms: {}",
host_,
port_,
path,
failedAttempts_,
dataRequestRetryState_.durationMs(),
error),
queue_);

Expand Down Expand Up @@ -457,34 +458,75 @@ std::shared_ptr<PrestoExchangeSource> PrestoExchangeSource::getSelfPtr() {
return std::dynamic_pointer_cast<PrestoExchangeSource>(shared_from_this());
}

const ConnectionPool& ConnectionPools::get(
const proxygen::Endpoint& endpoint,
folly::IOThreadPoolExecutor* ioExecutor) {
return *pools_.withULockPtr([&](auto ulock) -> const ConnectionPool* {
auto it = ulock->find(endpoint);
if (it != ulock->end()) {
return it->second.get();
}
auto wlock = ulock.moveFromUpgradeToWrite();
auto& pool = (*wlock)[endpoint];
if (!pool) {
pool = std::make_unique<ConnectionPool>();
pool->eventBase = ioExecutor->getEventBase();
pool->sessionPool = std::make_unique<proxygen::SessionPool>(nullptr, 10);
// Creation of the timer is not thread safe, so we do it here instead of
// in the constructor of HttpClient.
pool->eventBase->timer();
}
return pool.get();
});
}

void ConnectionPools::destroy() {
pools_.withWLock([](auto& pools) {
for (auto& [_, pool] : pools) {
pool->eventBase->runInEventBaseThread(
[sessionPool = std::move(pool->sessionPool)] {});
}
pools.clear();
});
}

// static
std::shared_ptr<exec::ExchangeSource> PrestoExchangeSource::create(
std::shared_ptr<PrestoExchangeSource> PrestoExchangeSource::create(
const std::string& url,
int destination,
const std::shared_ptr<exec::ExchangeQueue>& queue,
memory::MemoryPool* pool,
folly::CPUThreadPoolExecutor* driverExecutor,
folly::IOThreadPoolExecutor* httpExecutor) {
if (strncmp(url.c_str(), "http://", 7) == 0) {
const std::shared_ptr<velox::exec::ExchangeQueue>& queue,
velox::memory::MemoryPool* memoryPool,
folly::CPUThreadPoolExecutor* cpuExecutor,
folly::IOThreadPoolExecutor* ioExecutor,
ConnectionPools& connectionPools) {
folly::Uri uri(url);
if (uri.scheme() == "http") {
proxygen::Endpoint ep(uri.host(), uri.port(), false);
auto& connPool = connectionPools.get(ep, ioExecutor);
return std::make_shared<PrestoExchangeSource>(
folly::Uri(url),
uri,
destination,
queue,
pool,
driverExecutor,
httpExecutor);
} else if (strncmp(url.c_str(), "https://", 8) == 0) {
const auto systemConfig = SystemConfig::instance();
memoryPool,
cpuExecutor,
connPool.eventBase,
connPool.sessionPool.get());
}
if (uri.scheme() == "https") {
proxygen::Endpoint ep(uri.host(), uri.port(), true);
auto& connPool = connectionPools.get(ep, ioExecutor);
const auto* systemConfig = SystemConfig::instance();
const auto clientCertAndKeyPath =
systemConfig->httpsClientCertAndKeyPath().value_or("");
const auto ciphers = systemConfig->httpsSupportedCiphers();
return std::make_shared<PrestoExchangeSource>(
folly::Uri(url),
uri,
destination,
queue,
pool,
driverExecutor,
httpExecutor,
memoryPool,
cpuExecutor,
connPool.eventBase,
connPool.sessionPool.get(),
clientCertAndKeyPath,
ciphers);
}
Expand Down
53 changes: 45 additions & 8 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,38 @@

namespace facebook::presto {

// HTTP connection pool for a specific endpoint with its associated event base.
// All the operations on the SessionPool must be performed on the corresponding
// EventBase.
struct ConnectionPool {
folly::EventBase* eventBase;
std::unique_ptr<proxygen::SessionPool> sessionPool;
};

// Connection pools used by HTTP client in PrestoExchangeSource. It should be
// held living longer than all the PrestoExchangeSources and will be passed when
// we creating the exchange sources.
class ConnectionPools {
public:
~ConnectionPools() {
destroy();
}

const ConnectionPool& get(
const proxygen::Endpoint& endpoint,
folly::IOThreadPoolExecutor* ioExecutor);

void destroy();

private:
folly::Synchronized<folly::F14FastMap<
proxygen::Endpoint,
std::unique_ptr<ConnectionPool>,
proxygen::EndpointHash,
proxygen::EndpointEqual>>
pools_;
};

class PrestoExchangeSource : public velox::exec::ExchangeSource {
public:
class RetryState {
Expand All @@ -48,10 +80,14 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
.count();
}

int64_t durationMs() const {
return velox::getCurrentTimeMs() - startMs_;
}

// Returns whether we have exhausted all retries. We only retry if we spent
// less than maxWaitMs_ time after we first started.
bool isExhausted() const {
return velox::getCurrentTimeMs() - startMs_ > maxWaitMs_;
return durationMs() > maxWaitMs_;
}

private:
Expand All @@ -70,7 +106,8 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
const std::shared_ptr<velox::exec::ExchangeQueue>& queue,
velox::memory::MemoryPool* pool,
folly::CPUThreadPoolExecutor* driverExecutor,
folly::IOThreadPoolExecutor* httpExecutor,
folly::EventBase* ioEventBase,
proxygen::SessionPool* sessionPool,
const std::string& clientCertAndKeyPath_ = "",
const std::string& ciphers_ = "");

Expand Down Expand Up @@ -100,13 +137,15 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
uint32_t maxBytes,
uint32_t maxWaitSeconds) override;

static std::shared_ptr<ExchangeSource> create(
// Create an exchange source using pooled connections.
static std::shared_ptr<PrestoExchangeSource> create(
const std::string& url,
int destination,
const std::shared_ptr<velox::exec::ExchangeQueue>& queue,
velox::memory::MemoryPool* pool,
velox::memory::MemoryPool* memoryPool,
folly::CPUThreadPoolExecutor* cpuExecutor,
folly::IOThreadPoolExecutor* ioExecutor);
folly::IOThreadPoolExecutor* ioExecutor,
ConnectionPools& connectionPools);

/// Completes the future returned by 'request()' if it hasn't completed
/// already.
Expand Down Expand Up @@ -179,8 +218,7 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
const std::string& path,
uint32_t maxBytes,
uint32_t maxWaitSeconds,
const std::string& error,
bool retry = true);
const std::string& error);

void acknowledgeResults(int64_t ackSequence);

Expand Down Expand Up @@ -218,7 +256,6 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
const bool immediateBufferTransfer_;

folly::CPUThreadPoolExecutor* const driverExecutor_;
folly::IOThreadPoolExecutor* const httpExecutor_;

std::shared_ptr<http::HttpClient> httpClient_;
RetryState dataRequestRetryState_;
Expand Down
7 changes: 5 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "CoordinatorDiscoverer.h"
#include "presto_cpp/main/Announcer.h"
#include "presto_cpp/main/PeriodicTaskManager.h"
#include "presto_cpp/main/PrestoExchangeSource.h"
#include "presto_cpp/main/SignalHandler.h"
#include "presto_cpp/main/TaskResource.h"
#include "presto_cpp/main/common/ConfigReader.h"
Expand Down Expand Up @@ -328,7 +327,8 @@ void PrestoServer::run() {
queue,
pool,
driverExecutor_.get(),
exchangeHttpExecutor_.get());
exchangeHttpExecutor_.get(),
exchangeSourceConnectionPools_);
});

facebook::velox::exec::ExchangeSource::registerFactory(
Expand Down Expand Up @@ -452,6 +452,9 @@ void PrestoServer::run() {

unregisterConnectors();

PRESTO_SHUTDOWN_LOG(INFO) << "Releasing HTTP connection pools";
exchangeSourceConnectionPools_.destroy();

PRESTO_SHUTDOWN_LOG(INFO)
<< "Joining driver CPU Executor '" << driverExecutor_->getName()
<< "': threads: " << driverExecutor_->numActiveThreads() << "/"
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "presto_cpp/main/CPUMon.h"
#include "presto_cpp/main/CoordinatorDiscoverer.h"
#include "presto_cpp/main/PeriodicHeartbeatManager.h"
#include "presto_cpp/main/PrestoExchangeSource.h"
#include "presto_cpp/main/PrestoServerOperations.h"
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/memory/MemoryAllocator.h"
Expand Down Expand Up @@ -188,6 +189,8 @@ class PrestoServer {
// Executor for spilling.
std::shared_ptr<folly::CPUThreadPoolExecutor> spillerExecutor_;

ConnectionPools exchangeSourceConnectionPools_;

// Instance of MemoryAllocator used for all query memory allocations.
std::shared_ptr<velox::memory::MemoryAllocator> allocator_;

Expand Down
Loading