Skip to content

Commit

Permalink
Refactor copy test
Browse files Browse the repository at this point in the history
  • Loading branch information
royi-luo committed Nov 15, 2024
1 parent efe03f7 commit faa989d
Showing 1 changed file with 90 additions and 96 deletions.
186 changes: 90 additions & 96 deletions test/copy/copy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ class FlakyBufferManager : public storage::BufferManager {
uint64_t reserveCount = 0;
};

struct BMExceptionRecoveryTestConfig {
bool canFailDuringExecute;
bool canFailDuringCheckpoint;
std::function<void(main::Connection*)> initFunc;
std::function<std::unique_ptr<main::QueryResult>(main::Connection*, int)> executeFunc;
std::function<bool(main::QueryResult*)> earlyExitOnFailureFunc;
std::function<std::unique_ptr<main::QueryResult>(main::Connection*)> checkFunc;
uint64_t checkResult;
};

class CopyTest : public BaseGraphTest {
public:
void SetUp() override {
Expand Down Expand Up @@ -75,22 +85,23 @@ class CopyTest : public BaseGraphTest {
currentBM->setClientContext(conn->getClientContext());
}
std::string getInputDir() override { KU_UNREACHABLE; }
void BMExceptionRecoveryTest(BMExceptionRecoveryTestConfig cfg);
uint64_t failureFrequency;
FlakyBufferManager* currentBM;
};

TEST_F(CopyTest, NodeCopyBMExceptionRecoverySameConnection) {
void CopyTest::BMExceptionRecoveryTest(BMExceptionRecoveryTestConfig cfg) {
if (inMemMode) {
GTEST_SKIP();
}
static constexpr uint64_t dbSize = 64 * 1024 * 1024;
resetDB(dbSize);
conn->query("CREATE NODE TABLE account(ID INT64, PRIMARY KEY(ID))");
cfg.initFunc(conn.get());

// this test only checks robustness during the transaction
// we don't want to trigger BM exceptions during checkpoint
// TODO(Royi) fix checkpointing so this test passes even if BM fails during checkpoint
resetDBFlaky(true, false);
resetDBFlaky(cfg.canFailDuringExecute, cfg.canFailDuringCheckpoint);

for (int i = 0;; i++) {
ASSERT_LT(i, 20);
Expand All @@ -99,10 +110,9 @@ TEST_F(CopyTest, NodeCopyBMExceptionRecoverySameConnection) {
"COPY account FROM \"{}/dataset/snap/twitter/csv/twitter-nodes.csv\"",
KUZU_ROOT_DIRECTORY);

auto result = conn->query(queryString);
auto result = cfg.executeFunc(conn.get(), i);
if (!result->isSuccess()) {
if (result->getErrorMessage().starts_with(
"Copy exception: Found duplicated primary key value")) {
if (cfg.earlyExitOnFailureFunc(result.get())) {
break;
}
ASSERT_EQ(result->getErrorMessage(), "Buffer manager exception: Unable to allocate "
Expand All @@ -119,109 +129,93 @@ TEST_F(CopyTest, NodeCopyBMExceptionRecoverySameConnection) {
resetDB(dbSize);
{
// Test that the table copied as expected after the query
auto result = conn->query("MATCH (a:account) RETURN COUNT(*)");
auto result = cfg.checkFunc(conn.get());
ASSERT_TRUE(result->isSuccess()) << result->getErrorMessage();
ASSERT_TRUE(result->hasNext());
ASSERT_EQ(result->getNext()->getValue(0)->getValue<int64_t>(), 81306);
ASSERT_EQ(result->getNext()->getValue(0)->getValue<int64_t>(), cfg.checkResult);
}

database.reset();
conn.reset();
}

TEST_F(CopyTest, RelCopyBMExceptionRecoverySameConnection) {
if (inMemMode) {
GTEST_SKIP();
}
static constexpr uint64_t dbSize = 64 * 1024 * 1024;
resetDB(dbSize);
conn->query("CREATE NODE TABLE account(ID INT64, PRIMARY KEY(ID))");
conn->query("CREATE REL TABLE follows(FROM account TO account);");
ASSERT_TRUE(conn->query(
common::stringFormat("COPY account FROM \"{}/dataset/snap/twitter/csv/twitter-nodes.csv\"",
KUZU_ROOT_DIRECTORY)));

// this test only checks robustness during the transaction
// we don't want to trigger BM exceptions during checkpoint
resetDBFlaky(true, false);

for (int i = 0;; i++) {
ASSERT_LT(i, 20);

// there are many allocations in the partitioning phase
// we scale the failure frequency linearly so that we trigger at least one
// allocation failure in the batch insert phase
failureFrequency = 256 * (i + 30);

auto result = conn->query(common::stringFormat(
"COPY follows FROM '{}/dataset/snap/twitter/csv/twitter-edges.csv' (DELIM=' ')",
KUZU_ROOT_DIRECTORY));
if (!result->isSuccess()) {
ASSERT_EQ(result->getErrorMessage(), "Buffer manager exception: Unable to allocate "
"memory! The buffer pool is full and no "
"memory could be freed!");
TEST_F(CopyTest, NodeCopyBMExceptionRecoverySameConnection) {
BMExceptionRecoveryTestConfig cfg{.canFailDuringExecute = true,
.canFailDuringCheckpoint = false,
.initFunc =
[](main::Connection* conn) {
conn->query("CREATE NODE TABLE account(ID INT64, PRIMARY KEY(ID))");
},
.executeFunc =
[](main::Connection* conn, int) {
const auto queryString = common::stringFormat(
"COPY account FROM \"{}/dataset/snap/twitter/csv/twitter-nodes.csv\"",
KUZU_ROOT_DIRECTORY);

// clear the BM so that the failure frequency isn't messed with by cached pages
while (0 != currentBM->evictPages())
;
} else {
// the copy shouldn't succeed first try
ASSERT_GT(i, 0);
break;
}
}
return conn->query(queryString);
},
.earlyExitOnFailureFunc = [](main::QueryResult*) { return false; },
.checkFunc =
[](main::Connection* conn) { return conn->query("MATCH (a:account) RETURN COUNT(*)"); },
.checkResult = 81306};
BMExceptionRecoveryTest(cfg);
}

// Reopen the DB so no spurious errors occur during the query
resetDB(dbSize);
{
// Test that the table copied as expected after the query
auto result = conn->query("MATCH (a:account)-[:follows]->(b:account) RETURN COUNT(*)");
ASSERT_TRUE(result->isSuccess()) << result->getErrorMessage();
ASSERT_TRUE(result->hasNext());
ASSERT_EQ(result->getNext()->getValue(0)->getValue<int64_t>(), 2420766);
}
TEST_F(CopyTest, RelCopyBMExceptionRecoverySameConnection) {
BMExceptionRecoveryTestConfig cfg{.canFailDuringExecute = true,
.canFailDuringCheckpoint = false,
.initFunc =
[](main::Connection* conn) {
conn->query("CREATE NODE TABLE account(ID INT64, PRIMARY KEY(ID))");
conn->query("CREATE REL TABLE follows(FROM account TO account);");
ASSERT_TRUE(conn->query(common::stringFormat(
"COPY account FROM \"{}/dataset/snap/twitter/csv/twitter-nodes.csv\"",
KUZU_ROOT_DIRECTORY)));
},
.executeFunc =
[this](main::Connection* conn, int i) {
// there are many allocations in the partitioning phase
// we scale the failure frequency linearly so that we trigger at least one
// allocation failure in the batch insert phase
failureFrequency = 512 * (i + 15);

database.reset();
conn.reset();
return conn->query(common::stringFormat(
"COPY follows FROM '{}/dataset/snap/twitter/csv/twitter-edges.csv' (DELIM=' ')",
KUZU_ROOT_DIRECTORY));
},
.earlyExitOnFailureFunc =
[this](main::QueryResult*) {
// clear the BM so that the failure frequency isn't messed with by cached pages
while (0 != currentBM->evictPages())
;
return false;
},
.checkFunc =
[](main::Connection* conn) {
return conn->query("MATCH (a:account)-[:follows]->(b:account) RETURN COUNT(*)");
},
.checkResult = 2420766};
BMExceptionRecoveryTest(cfg);
}

TEST_F(CopyTest, NodeInsertBMExceptionDuringCommitRecovery) {
// The BM exception should only be thrown during commit
resetDBFlaky(false, false);
failureFrequency = 128;

conn->query("CREATE NODE TABLE account(ID INT64, PRIMARY KEY(ID))");

static constexpr uint64_t numValues = 200000;
const auto performInserts = [this]() -> bool {
const auto queryString =
common::stringFormat("UNWIND RANGE(1,{}) AS i CREATE (a:account {ID:i})", numValues);

auto result = conn->query(queryString);
if (!result->isSuccess()) {
EXPECT_EQ(result->getErrorMessage(), "Buffer manager exception: Unable to allocate "
"memory! The buffer pool is full and no "
"memory could be freed!");
return false;
}
return true;
};
for (int i = 0;; ++i) {
ASSERT_LT(i, 20);
if (performInserts()) {
break;
}
}
{
// Test that the table copied as expected after the query
auto result = conn->query("MATCH (a:account) RETURN COUNT(*)");
ASSERT_TRUE(result->isSuccess()) << result->getErrorMessage();
ASSERT_TRUE(result->hasNext());
ASSERT_EQ(result->getNext()->getValue(0)->getValue<int64_t>(), numValues);
}
BMExceptionRecoveryTestConfig cfg{.canFailDuringExecute = false,
.canFailDuringCheckpoint = false,
.initFunc =
[this](main::Connection* conn) {
failureFrequency = 128;
conn->query("CREATE NODE TABLE account(ID INT64, PRIMARY KEY(ID))");
},
.executeFunc =
[](main::Connection* conn, int) {
const auto queryString = common::stringFormat(
"UNWIND RANGE(1,{}) AS i CREATE (a:account {ID:i})", numValues);

database.reset();
conn.reset();
return conn->query(queryString);
},
.earlyExitOnFailureFunc = [](main::QueryResult*) { return false; },
.checkFunc =
[](main::Connection* conn) { return conn->query("MATCH (a:account) RETURN COUNT(*)"); },
.checkResult = numValues};
BMExceptionRecoveryTest(cfg);
}

TEST_F(CopyTest, OutOfMemoryRecovery) {
Expand Down

0 comments on commit faa989d

Please sign in to comment.