Skip to content

Commit

Permalink
[native]Fix duplicate root memory pool in case of background arbitration
Browse files Browse the repository at this point in the history
If a task from a query completes before the other tasks get scheduled on the same
worker, then the query context might have been destroyed before creating the other
tasks from the same query. The query context manager maintains a query context
cache but it only holds the weak pointer. So that when the next task from the same
query comes, it will create a new query context with the same query id and creates
a root memory pool with the same name from the memory manager. If there is a
background memory arbitration process, then the old memory pool instance might
still be held by the memory arbitration process and run into duplicate root memory pool
issue.
This PR fixes this issue by appending a monotonically increasing id to root memory
pool name to ensure the instance name is unique.
  • Loading branch information
xiaoxmeng committed Nov 15, 2023
1 parent 2c5bdb4 commit b2fc43c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 2 deletions.
18 changes: 17 additions & 1 deletion presto-native-execution/presto_cpp/main/QueryContextManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,15 @@ std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtx(
}

velox::core::QueryConfig queryConfig{std::move(configStrings)};
// NOTE: the monotonically increasing 'poolId' is appended to 'queryId' to
// ensure that the name of root memory pool instance is always unique. In some
// edge case, we found some background activities such as the long-running
// memory arbitration process will still hold the query root memory pool even
// though the query ctx has been evicted out of the cache. The query ctx cache
// is still indexed by the query id.
static std::atomic_uint64_t poolId{0};
auto pool = memory::defaultMemoryManager().addRootPool(
queryId,
fmt::format("{}_{}", queryId, poolId++),
queryConfig.queryMaxMemoryPerNode() != 0
? queryConfig.queryMaxMemoryPerNode()
: SystemConfig::instance()->queryMaxMemoryPerNode(),
Expand Down Expand Up @@ -163,4 +170,13 @@ void QueryContextManager::visitAllContexts(
}
}

void QueryContextManager::testingClearCache() {
queryContextCache_.wlock()->testingClear();
}

void QueryContextCache::testingClear() {
queryCtxs_.clear();
queryIds_.clear();
}

} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class QueryContextCache {
return queryCtxs_;
}

void testingClear();

private:
size_t capacity_;

Expand All @@ -106,11 +108,14 @@ class QueryContextManager {
const protocol::TaskId& taskId,
const protocol::SessionRepresentation& session);

// Calls the given functor for every present query context.
/// Calls the given functor for every present query context.
void visitAllContexts(std::function<void(
const protocol::QueryId&,
const velox::core::QueryCtx*)> visitor) const;

/// Test method to clear the query context cache.
void testingClearCache();

private:
std::shared_ptr<velox::core::QueryCtx> findOrCreateQueryCtx(
const protocol::TaskId& taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* limitations under the License.
*/
#include "presto_cpp/main/QueryContextManager.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "presto_cpp/main/TaskManager.h"

Expand Down Expand Up @@ -68,4 +69,50 @@ TEST_F(QueryContextManagerTest, defaultSessionProperties) {
EXPECT_EQ(queryCtx->queryConfig().spillWriteBufferSize(), 1L << 20);
}

TEST_F(QueryContextManagerTest, duplicateQueryRootPoolName) {
const protocol::TaskId fakeTaskId = "scan.0.0.1.0";
const protocol::SessionRepresentation fakeSession{.systemProperties = {}};
auto* queryCtxManager = taskManager_->getQueryContextManager();
struct {
bool hasPendingReference;
bool clearCache;
bool expectedNewPoolName;

std::string debugString() const {
return fmt::format(
"hasPendingReference: {}, clearCache: {}, expectedNewPoolName: {}",
hasPendingReference,
clearCache,
expectedNewPoolName);
}
} testSettings[] = {
{true, true, true},
{true, false, false},
{false, true, true},
{false, false, true}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
queryCtxManager->testingClearCache();

auto queryCtx =
queryCtxManager->findOrCreateQueryCtx(fakeTaskId, fakeSession);
const auto poolName = queryCtx->pool()->name();
ASSERT_THAT(poolName, testing::HasSubstr("scan_"));
if (!testData.hasPendingReference) {
queryCtx.reset();
}
if (testData.clearCache) {
queryCtxManager->testingClearCache();
}
auto newQueryCtx =
queryCtxManager->findOrCreateQueryCtx(fakeTaskId, fakeSession);
const auto newPoolName = newQueryCtx->pool()->name();
ASSERT_THAT(newPoolName, testing::HasSubstr("scan_"));
if (testData.expectedNewPoolName) {
ASSERT_NE(poolName, newPoolName);
} else {
ASSERT_EQ(poolName, newPoolName);
}
}
}
} // namespace facebook::presto

0 comments on commit b2fc43c

Please sign in to comment.