Skip to content

Commit

Permalink
[native]Add to retry get data error on memory allocation failure
Browse files Browse the repository at this point in the history
Add to retry memory allocation failure in presto exchange source as the
other http failure
Also add number of failed attempts on final error message
  • Loading branch information
xiaoxmeng authored and tanjialiang committed Oct 15, 2023
1 parent c6292d0 commit 9378300
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void PrestoExchangeSource::doRequest(
bodyAsString(*response, self->pool_.get())));
} else if (response->hasError()) {
self->processDataError(
path, maxBytes, maxWaitSeconds, response->error(), false);
path, maxBytes, maxWaitSeconds, response->error());
} else {
self->processDataResponse(std::move(response));
}
Expand Down Expand Up @@ -335,10 +335,11 @@ void PrestoExchangeSource::processDataError(

onFinalFailure(
fmt::format(
"Failed to fetch data from {}:{} {} - Exhausted retries: {}",
"Failed to fetch data from {}:{} {} - Exhausted after {} retries: {}",
host_,
port_,
path,
failedAttempts_,
error),
queue_);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "presto_cpp/main/tests/HttpServerWrapper.h"
#include "presto_cpp/main/tests/MultableConfigs.h"
#include "presto_cpp/presto_protocol/presto_protocol.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/memory/MemoryAllocator.h"
#include "velox/common/memory/MmapAllocator.h"
Expand Down Expand Up @@ -747,35 +748,67 @@ TEST_P(PrestoExchangeSourceTest, failedProducer) {
EXPECT_THROW(waitForNextPage(queue), std::exception);
}

TEST_P(PrestoExchangeSourceTest, exceedingMemoryCapacityForHttpResponse) {
const int64_t memoryCapBytes = 1 << 10;
DEBUG_ONLY_TEST_P(
PrestoExchangeSourceTest,
exceedingMemoryCapacityForHttpResponse) {
const int64_t memoryCapBytes = 1L << 30;
const bool useHttps = GetParam().useHttps;
auto rootPool = defaultMemoryManager().addRootPool("", memoryCapBytes);
auto leafPool =
rootPool->addLeafChild("exceedingMemoryCapacityForHttpResponse");

auto producer = std::make_unique<Producer>();
for (bool persistentError : {false, true}) {
SCOPED_TRACE(fmt::format("persistentError: {}", persistentError));

auto producerServer = createHttpServer(useHttps);
producer->registerEndpoints(producerServer.get());
auto rootPool = defaultMemoryManager().addRootPool("", memoryCapBytes);
const std::string leafPoolName("exceedingMemoryCapacityForHttpResponse");
auto leafPool = rootPool->addLeafChild(leafPoolName);

test::HttpServerWrapper serverWrapper(std::move(producerServer));
auto producerAddress = serverWrapper.start().get();
// Setup to allow exchange source sufficient time to retry.
SystemConfig::instance()->setValue(
std::string(SystemConfig::kExchangeMaxErrorDuration), "3s");

auto queue = makeSingleSourceQueue();
auto exchangeSource =
makeExchangeSource(producerAddress, useHttps, 3, queue, leafPool.get());
const std::string injectedErrorMessage{"Inject allocation error"};
std::atomic<int> numAllocations{0};
SCOPED_TESTVALUE_SET(
"facebook::velox::memory::MemoryPoolImpl::reserveThreadSafe",
std::function<void(MemoryPool*)>(([&](MemoryPool* pool) {
if (pool->name().compare(leafPoolName) != 0) {
return;
}
// For non-consistent error, inject memory allocation failure once.
++numAllocations;
if (numAllocations > 1 && !persistentError) {
return;
}
VELOX_FAIL(injectedErrorMessage);
})));

requestNextPage(queue, exchangeSource);
const std::string largePayload(2 * memoryCapBytes, 'L');
auto producer = std::make_unique<Producer>();

producer->enqueue(largePayload);
ASSERT_ANY_THROW(waitForNextPage(queue));
producer->noMoreData();
// Verify that we never retry on memory allocation failure of the http
// response data but just fails the query.
ASSERT_EQ(exchangeSource->testingFailedAttempts(), 1);
ASSERT_EQ(leafPool->currentBytes(), 0);
auto producerServer = createHttpServer(useHttps);
producer->registerEndpoints(producerServer.get());

test::HttpServerWrapper serverWrapper(std::move(producerServer));
auto producerAddress = serverWrapper.start().get();

auto queue = makeSingleSourceQueue();
auto exchangeSource =
makeExchangeSource(producerAddress, useHttps, 3, queue, leafPool.get());

requestNextPage(queue, exchangeSource);
const std::string payload(1 << 20, 'L');
producer->enqueue(payload);

if (persistentError) {
VELOX_ASSERT_THROW(waitForNextPage(queue), "Failed to fetch data from");
} else {
const auto receivedPage = waitForNextPage(queue);
ASSERT_EQ(toString(receivedPage.get()), payload);
}
producer->noMoreData();
// Verify that we have retried on memory allocation failure of the http
// response data but just fails the query.
ASSERT_GE(exchangeSource->testingFailedAttempts(), 1);
ASSERT_EQ(leafPool->currentBytes(), 0);
}
}

TEST_P(PrestoExchangeSourceTest, memoryAllocationAndUsageCheck) {
Expand Down

0 comments on commit 9378300

Please sign in to comment.