From 9378300f592246adb7e8c1f14e6f9ffcaeb7b762 Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Sat, 14 Oct 2023 11:23:49 -0700 Subject: [PATCH] [native]Add to retry get data error on memory allocation failure Add to retry memory allocation failure in presto exchange source as the other http failure Also add number of failed attempts on final error message --- .../presto_cpp/main/PrestoExchangeSource.cpp | 5 +- .../main/tests/PrestoExchangeSourceTest.cpp | 77 +++++++++++++------ 2 files changed, 58 insertions(+), 24 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp index 92070c7fa60a7..b6279b0f119f1 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp @@ -195,7 +195,7 @@ void PrestoExchangeSource::doRequest( bodyAsString(*response, self->pool_.get()))); } else if (response->hasError()) { self->processDataError( - path, maxBytes, maxWaitSeconds, response->error(), false); + path, maxBytes, maxWaitSeconds, response->error()); } else { self->processDataResponse(std::move(response)); } @@ -335,10 +335,11 @@ void PrestoExchangeSource::processDataError( onFinalFailure( fmt::format( - "Failed to fetch data from {}:{} {} - Exhausted retries: {}", + "Failed to fetch data from {}:{} {} - Exhausted after {} retries: {}", host_, port_, path, + failedAttempts_, error), queue_); diff --git a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp index 1cf73910e273d..06a512483f750 100644 --- a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp @@ -22,6 +22,7 @@ #include "presto_cpp/main/tests/HttpServerWrapper.h" #include "presto_cpp/main/tests/MultableConfigs.h" #include "presto_cpp/presto_protocol/presto_protocol.h" +#include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" #include "velox/common/memory/MemoryAllocator.h" #include "velox/common/memory/MmapAllocator.h" @@ -747,35 +748,67 @@ TEST_P(PrestoExchangeSourceTest, failedProducer) { EXPECT_THROW(waitForNextPage(queue), std::exception); } -TEST_P(PrestoExchangeSourceTest, exceedingMemoryCapacityForHttpResponse) { - const int64_t memoryCapBytes = 1 << 10; +DEBUG_ONLY_TEST_P( + PrestoExchangeSourceTest, + exceedingMemoryCapacityForHttpResponse) { + const int64_t memoryCapBytes = 1L << 30; const bool useHttps = GetParam().useHttps; - auto rootPool = defaultMemoryManager().addRootPool("", memoryCapBytes); - auto leafPool = - rootPool->addLeafChild("exceedingMemoryCapacityForHttpResponse"); - auto producer = std::make_unique(); + for (bool persistentError : {false, true}) { + SCOPED_TRACE(fmt::format("persistentError: {}", persistentError)); - auto producerServer = createHttpServer(useHttps); - producer->registerEndpoints(producerServer.get()); + auto rootPool = defaultMemoryManager().addRootPool("", memoryCapBytes); + const std::string leafPoolName("exceedingMemoryCapacityForHttpResponse"); + auto leafPool = rootPool->addLeafChild(leafPoolName); - test::HttpServerWrapper serverWrapper(std::move(producerServer)); - auto producerAddress = serverWrapper.start().get(); + // Setup to allow exchange source sufficient time to retry. + SystemConfig::instance()->setValue( + std::string(SystemConfig::kExchangeMaxErrorDuration), "3s"); - auto queue = makeSingleSourceQueue(); - auto exchangeSource = - makeExchangeSource(producerAddress, useHttps, 3, queue, leafPool.get()); + const std::string injectedErrorMessage{"Inject allocation error"}; + std::atomic numAllocations{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::memory::MemoryPoolImpl::reserveThreadSafe", + std::function(([&](MemoryPool* pool) { + if (pool->name().compare(leafPoolName) != 0) { + return; + } + // For non-consistent error, inject memory allocation failure once. + ++numAllocations; + if (numAllocations > 1 && !persistentError) { + return; + } + VELOX_FAIL(injectedErrorMessage); + }))); - requestNextPage(queue, exchangeSource); - const std::string largePayload(2 * memoryCapBytes, 'L'); + auto producer = std::make_unique(); - producer->enqueue(largePayload); - ASSERT_ANY_THROW(waitForNextPage(queue)); - producer->noMoreData(); - // Verify that we never retry on memory allocation failure of the http - // response data but just fails the query. - ASSERT_EQ(exchangeSource->testingFailedAttempts(), 1); - ASSERT_EQ(leafPool->currentBytes(), 0); + auto producerServer = createHttpServer(useHttps); + producer->registerEndpoints(producerServer.get()); + + test::HttpServerWrapper serverWrapper(std::move(producerServer)); + auto producerAddress = serverWrapper.start().get(); + + auto queue = makeSingleSourceQueue(); + auto exchangeSource = + makeExchangeSource(producerAddress, useHttps, 3, queue, leafPool.get()); + + requestNextPage(queue, exchangeSource); + const std::string payload(1 << 20, 'L'); + producer->enqueue(payload); + + if (persistentError) { + VELOX_ASSERT_THROW(waitForNextPage(queue), "Failed to fetch data from"); + } else { + const auto receivedPage = waitForNextPage(queue); + ASSERT_EQ(toString(receivedPage.get()), payload); + } + producer->noMoreData(); + // Verify that we have retried on memory allocation failure of the http + // response data but just fails the query. + ASSERT_GE(exchangeSource->testingFailedAttempts(), 1); + ASSERT_EQ(leafPool->currentBytes(), 0); + } } TEST_P(PrestoExchangeSourceTest, memoryAllocationAndUsageCheck) {