Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rollback during Node Table COPY #4467

Merged
merged 28 commits into from
Dec 2, 2024
Merged
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7b295fa
Fix COPY rollback
royi-luo Nov 4, 2024
756fef4
Make VersionRecord all stack-allocated
royi-luo Nov 18, 2024
b9af266
Try fix tests
royi-luo Nov 18, 2024
789c54e
Combine interfaces for creating undo buffer info
royi-luo Nov 18, 2024
6a617e0
Properly maintain numTotalRows for rel table data
royi-luo Nov 18, 2024
ac3e9e1
Code cleanup
royi-luo Nov 18, 2024
99e2ef1
Avoid appending to undo buffer for local tables
royi-luo Nov 18, 2024
1308842
Bug fixes
royi-luo Nov 18, 2024
8c72b07
Actually pass rollback insert callback
royi-luo Nov 19, 2024
ffd0cd5
Add second layer of iterators to undo buffer
royi-luo Nov 19, 2024
739b5d8
Bug fixes + code cleanup
royi-luo Nov 19, 2024
14fb9a1
Cleanup node table + actually use semi mask
royi-luo Nov 19, 2024
76ed7d0
Actually enable semi mask
royi-luo Nov 20, 2024
5a1e353
Self-review
royi-luo Nov 20, 2024
a87f669
Add tests
royi-luo Nov 20, 2024
07cef58
Reclaim overflow slots in in-mem hash index after delete
royi-luo Nov 21, 2024
a91e448
Address review comments
royi-luo Nov 29, 2024
9134abb
Replace construct iterator callback with virtual class
royi-luo Nov 29, 2024
c3c638e
Refactor version record handler
royi-luo Nov 29, 2024
89f713f
Refactor version record handler again
royi-luo Nov 29, 2024
08757c3
Rollback insert for node groups
royi-luo Nov 29, 2024
6b30e35
Remove unused forward declares
royi-luo Nov 29, 2024
1a55443
Get correct num of total rows to rollback in node group collection
royi-luo Nov 29, 2024
84b6e17
Make BM exception during rel commit trigger earlier
royi-luo Nov 29, 2024
40691ca
Update num total rows for rel table data node group collection
royi-luo Nov 30, 2024
4ad2632
Address review comments
royi-luo Dec 2, 2024
c052e90
Rework nextChainedSlots() for in mem hash index
royi-luo Dec 2, 2024
cec96e5
Update splitSlots so behaviour is same as before nextChainedSlot() re…
royi-luo Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Replace construct iterator callback with virtual class
royi-luo committed Dec 2, 2024
commit 9134abb0d12d6ea3590b8094e983859b9e6ac05d
25 changes: 17 additions & 8 deletions src/include/storage/store/chunked_group_undo_iterator.h
Original file line number Diff line number Diff line change
@@ -13,26 +13,26 @@ class Transaction;
namespace storage {
class ChunkedNodeGroup;
class NodeGroupCollection;
class ChunkedGroupUndoIterator;
class VersionRecordHandler;

using chunked_group_undo_op_t = void (
using version_record_handler_op_t = void (
ChunkedNodeGroup::*)(common::row_idx_t, common::row_idx_t, common::transaction_t);

using chunked_group_iterator_construct_t = std::function<std::unique_ptr<ChunkedGroupUndoIterator>(
using version_record_handler_construct_t = std::function<std::unique_ptr<VersionRecordHandler>(
common::row_idx_t, common::row_idx_t, common::node_group_idx_t, common::transaction_t)>;

// Note: these iterators are not necessarily thread-safe when used on their own
class ChunkedGroupUndoIterator {
class VersionRecordHandler {
public:
ChunkedGroupUndoIterator(NodeGroupCollection* nodeGroups, common::row_idx_t startRow,
VersionRecordHandler(NodeGroupCollection* nodeGroups, common::row_idx_t startRow,
common::row_idx_t numRows, common::transaction_t commitTS)
: startRow(startRow), numRows(numRows), commitTS(commitTS), nodeGroups(nodeGroups) {}

virtual ~ChunkedGroupUndoIterator() = default;
virtual ~VersionRecordHandler() = default;

virtual void initRollbackInsert(const transaction::Transaction* /*transaction*/) {}
virtual void finalizeRollbackInsert() {};
virtual void iterate(chunked_group_undo_op_t undoFunc) = 0;
virtual void finalizeRollbackInsert(){};
virtual void applyFuncToChunkedGroups(version_record_handler_op_t func) = 0;

protected:
common::row_idx_t startRow;
@@ -42,5 +42,14 @@ class ChunkedGroupUndoIterator {
NodeGroupCollection* nodeGroups;
};

class VersionRecordHandlerData {
public:
virtual ~VersionRecordHandlerData() = default;

virtual std::unique_ptr<VersionRecordHandler> constructVersionRecordHandler(
common::row_idx_t startRow, common::row_idx_t numRows, common::transaction_t commitTS,
common::node_group_idx_t nodeGroupIdx) const = 0;
};

} // namespace storage
} // namespace kuzu
4 changes: 2 additions & 2 deletions src/include/storage/store/csr_node_group.h
Original file line number Diff line number Diff line change
@@ -165,12 +165,12 @@ static constexpr common::column_id_t REL_ID_COLUMN_ID = 1;
struct RelTableScanState;
class CSRNodeGroup final : public NodeGroup {
public:
class PersistentIterator : public ChunkedGroupUndoIterator {
class PersistentIterator : public VersionRecordHandler {
public:
PersistentIterator(NodeGroupCollection* nodeGroups, common::node_group_idx_t nodeGroupIdx,
common::row_idx_t startRow, common::row_idx_t numRows, common::transaction_t commitTS);

void iterate(chunked_group_undo_op_t undoFunc) override;
void applyFuncToChunkedGroups(version_record_handler_op_t func) override;
void finalizeRollbackInsert() override;

private:
4 changes: 2 additions & 2 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
@@ -82,11 +82,11 @@ static auto NODE_GROUP_SCAN_EMMPTY_RESULT = NodeGroupScanResult{};
struct TableScanState;
class NodeGroup {
public:
class ChunkedGroupIterator : public ChunkedGroupUndoIterator {
class ChunkedGroupIterator : public VersionRecordHandler {
public:
ChunkedGroupIterator(NodeGroupCollection* nodeGroups, common::node_group_idx_t nodeGroupIdx,
common::row_idx_t startRow, common::row_idx_t numRows, common::transaction_t commitTS);
void iterate(chunked_group_undo_op_t undoFunc) override;
void applyFuncToChunkedGroups(version_record_handler_op_t func) override;
void finalizeRollbackInsert() override;

protected:
10 changes: 5 additions & 5 deletions src/include/storage/store/node_group_collection.h
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ class NodeGroupCollection {
public:
NodeGroupCollection(MemoryManager& memoryManager, const std::vector<common::LogicalType>& types,
bool enableCompression, FileHandle* dataFH = nullptr, common::Deserializer* deSer = nullptr,
const chunked_group_iterator_construct_t* iteratorConstructFunc = nullptr);
const VersionRecordHandlerData* versionRecordHandlerData = nullptr);

void append(const transaction::Transaction* transaction,
const std::vector<common::ValueVector*>& vectors);
@@ -51,7 +51,7 @@ class NodeGroupCollection {
}
NodeGroup* getOrCreateNodeGroup(transaction::Transaction* transaction,
common::node_group_idx_t groupIdx, NodeGroupDataFormat format,
const chunked_group_iterator_construct_t* constructIteratorFunc_);
const VersionRecordHandlerData* versionRecordHandlerData);

void setNodeGroup(const common::node_group_idx_t nodeGroupIdx,
std::unique_ptr<NodeGroup> group) {
@@ -82,12 +82,12 @@ class NodeGroupCollection {
void pushInsertInfo(const transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::row_idx_t startRow,
common::row_idx_t numRows,
const chunked_group_iterator_construct_t* constructIteratorFunc_);
const VersionRecordHandlerData* overridedVersionRecordHandlerData);

private:
void pushInsertInfo(const transaction::Transaction* transaction, NodeGroup* nodeGroup,
common::row_idx_t numRows,
const chunked_group_iterator_construct_t* constructIteratorOverrideFunc = nullptr);
const VersionRecordHandlerData* overridedVersionRecordHandlerData = nullptr);

bool enableCompression;
// Num rows in the collection regardless of deletions.
@@ -96,7 +96,7 @@ class NodeGroupCollection {
GroupCollection<NodeGroup> nodeGroups;
FileHandle* dataFH;
TableStats stats;
const chunked_group_iterator_construct_t* iteratorConstructFunc;
const VersionRecordHandlerData* versionRecordHandlerData;
};

} // namespace storage
19 changes: 14 additions & 5 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ class Transaction;
} // namespace transaction

namespace storage {
class NodeTable;

struct NodeTableScanState final : TableScanState {
// Scan state for un-committed data.
@@ -95,6 +96,18 @@ struct PKColumnScanHelper {
PrimaryKeyIndex* pkIndex;
};

class NodeTableVersionRecordHandlerData : public VersionRecordHandlerData {
public:
explicit NodeTableVersionRecordHandlerData(NodeTable* nodeTable) : nodeTable(nodeTable) {}

std::unique_ptr<VersionRecordHandler> constructVersionRecordHandler(common::row_idx_t startRow,
common::row_idx_t numRows, common::transaction_t commitTS,
common::node_group_idx_t nodeGroupIdx) const override;

private:
NodeTable* nodeTable;
};

class StorageManager;
class NodeTable final : public Table {
public:
@@ -201,10 +214,6 @@ class NodeTable final : public Table {

TableStats getStats(const transaction::Transaction* transaction) const;

const chunked_group_iterator_construct_t& getIteratorConstructFunc() const {
return iteratorConstructFunc;
}

private:
void validatePkNotExists(const transaction::Transaction* transaction,
common::ValueVector* pkVector);
@@ -221,7 +230,7 @@ class NodeTable final : public Table {
std::unique_ptr<NodeGroupCollection> nodeGroups;
common::column_id_t pkColumnID;
std::unique_ptr<PrimaryKeyIndex> pkIndex;
chunked_group_iterator_construct_t iteratorConstructFunc;
NodeTableVersionRecordHandlerData versionRecordHandlerData;
};

} // namespace storage
29 changes: 26 additions & 3 deletions src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
@@ -14,12 +14,27 @@ class Transaction;
}
namespace storage {
class MemoryManager;
class RelTableData;

struct CSRHeaderColumns {
std::unique_ptr<Column> offset;
std::unique_ptr<Column> length;
};

class RelTableVersionRecordHandlerData : public VersionRecordHandlerData {
public:
RelTableVersionRecordHandlerData(RelTableData* relTableData, CSRNodeGroupScanSource source)
: relTableData(relTableData), source(source) {}

std::unique_ptr<VersionRecordHandler> constructVersionRecordHandler(common::row_idx_t startRow,
common::row_idx_t numRows, common::transaction_t commitTS,
common::node_group_idx_t nodeGroupIdx) const override;

private:
RelTableData* relTableData;
CSRNodeGroupScanSource source;
};

class RelTableData {
public:
RelTableData(FileHandle* dataFH, MemoryManager* mm, ShadowFile* shadowFile,
@@ -56,7 +71,7 @@ class RelTableData {
NodeGroup* getOrCreateNodeGroup(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx) const {
return nodeGroups->getOrCreateNodeGroup(transaction, nodeGroupIdx, NodeGroupDataFormat::CSR,
&persistentIteratorConstructFunc);
&persistentVersionRecordHandlerData);
}

common::RelMultiplicity getMultiplicity() const { return multiplicity; }
@@ -70,6 +85,11 @@ class RelTableData {

void serialize(common::Serializer& serializer) const;

std::unique_ptr<VersionRecordHandler> constructVersionRecordHandler(
CSRNodeGroupScanSource source, common::node_group_idx_t nodeGroupIdx,
common::row_idx_t startRow, common::row_idx_t numRows,
common::transaction_t commitTS) const;

private:
void initCSRHeaderColumns();
void initPropertyColumns(const catalog::TableCatalogEntry* tableEntry);
@@ -98,6 +118,9 @@ class RelTableData {
return types;
}

const RelTableVersionRecordHandlerData* getVersionRecordHandlerData(
CSRNodeGroupScanSource source);

private:
FileHandle* dataFH;
common::table_id_t tableID;
@@ -114,8 +137,8 @@ class RelTableData {
CSRHeaderColumns csrHeaderColumns;
std::vector<std::unique_ptr<Column>> columns;

chunked_group_iterator_construct_t inMemIteratorConstructFunc;
chunked_group_iterator_construct_t persistentIteratorConstructFunc;
RelTableVersionRecordHandlerData persistentVersionRecordHandlerData;
RelTableVersionRecordHandlerData inMemoryVersionRecordHandlerData;
};

} // namespace storage
9 changes: 6 additions & 3 deletions src/include/storage/undo_buffer.h
Original file line number Diff line number Diff line change
@@ -88,9 +88,11 @@ class UndoBuffer {
void createSequenceChange(catalog::SequenceCatalogEntry& sequenceEntry,
const catalog::SequenceRollbackData& data);
void createInsertInfo(common::node_group_idx_t nodeGroupIdx, common::row_idx_t startRow,
common::row_idx_t numRows, const chunked_group_iterator_construct_t* iteratorConstructFunc);
common::row_idx_t numRows,
const storage::VersionRecordHandlerData* versionRecordHandlerData);
void createDeleteInfo(common::node_group_idx_t nodeGroupIdx, common::row_idx_t startRow,
common::row_idx_t numRows, const chunked_group_iterator_construct_t* iteratorConstructFunc);
common::row_idx_t numRows,
const storage::VersionRecordHandlerData* versionRecordHandlerData);
void createVectorUpdateInfo(UpdateInfo* updateInfo, common::idx_t vectorIdx,
VectorUpdateInfo* vectorUpdateInfo);

@@ -103,7 +105,8 @@ class UndoBuffer {
uint8_t* createUndoRecord(uint64_t size);

void createVersionInfo(UndoRecordType recordType, common::row_idx_t startRow,
common::row_idx_t numRows, const chunked_group_iterator_construct_t* iteratorConstructFunc,
common::row_idx_t numRows,
const storage::VersionRecordHandlerData* versionRecordHandlerData,
common::node_group_idx_t nodeGroupIdx = 0);

void commitRecord(UndoRecordType recordType, const uint8_t* record,
11 changes: 4 additions & 7 deletions src/include/transaction/transaction.h
Original file line number Diff line number Diff line change
@@ -23,16 +23,13 @@ class VersionInfo;
class UpdateInfo;
struct VectorUpdateInfo;
class ChunkedNodeGroup;
class ChunkedGroupUndoIterator;
class VersionRecordHandler;
class VersionRecordHandlerData;
} // namespace storage
namespace transaction {
class TransactionManager;
class Transaction;

using chunked_group_iterator_construct_t =
std::function<std::unique_ptr<storage::ChunkedGroupUndoIterator>(common::row_idx_t,
common::row_idx_t, common::node_group_idx_t, common::transaction_t commitTS)>;

enum class TransactionType : uint8_t { READ_ONLY, WRITE, CHECKPOINT, DUMMY, RECOVERY };

class Transaction {
@@ -123,10 +120,10 @@ class Transaction {
const catalog::SequenceRollbackData& data) const;
void pushInsertInfo(common::node_group_idx_t nodeGroupIdx, common::row_idx_t startRow,
common::row_idx_t numRows,
const chunked_group_iterator_construct_t* constructIteratorFunc = nullptr) const;
const storage::VersionRecordHandlerData* versionRecordHandlerData) const;
void pushDeleteInfo(common::node_group_idx_t nodeGroupIdx, common::row_idx_t startRow,
common::row_idx_t numRows,
const chunked_group_iterator_construct_t* constructIteratorFunc) const;
const storage::VersionRecordHandlerData* versionRecordHandlerData) const;
void pushVectorUpdateInfo(storage::UpdateInfo& updateInfo, common::idx_t vectorIdx,
storage::VectorUpdateInfo& vectorUpdateInfo) const;

6 changes: 3 additions & 3 deletions src/storage/store/csr_node_group.cpp
Original file line number Diff line number Diff line change
@@ -15,15 +15,15 @@ namespace storage {
CSRNodeGroup::PersistentIterator::PersistentIterator(NodeGroupCollection* nodeGroups,
common::node_group_idx_t nodeGroupIdx, common::row_idx_t startRow, common::row_idx_t numRows,
common::transaction_t commitTS)
: ChunkedGroupUndoIterator(nodeGroups, startRow, numRows, commitTS), nodeGroup(nullptr) {
: VersionRecordHandler(nodeGroups, startRow, numRows, commitTS), nodeGroup(nullptr) {
if (nodeGroupIdx < nodeGroups->getNumNodeGroups()) {
nodeGroup = ku_dynamic_cast<CSRNodeGroup*>(nodeGroups->getNodeGroupNoLock(nodeGroupIdx));
}
}

void CSRNodeGroup::PersistentIterator::iterate(chunked_group_undo_op_t undoFunc) {
void CSRNodeGroup::PersistentIterator::applyFuncToChunkedGroups(version_record_handler_op_t func) {
if (nodeGroup && nodeGroup->persistentChunkGroup) {
std::invoke(undoFunc, *nodeGroup->persistentChunkGroup, startRow, numRows, commitTS);
std::invoke(func, *nodeGroup->persistentChunkGroup, startRow, numRows, commitTS);
}
}

6 changes: 3 additions & 3 deletions src/storage/store/node_group.cpp
Original file line number Diff line number Diff line change
@@ -24,13 +24,13 @@ namespace storage {
NodeGroup::ChunkedGroupIterator::ChunkedGroupIterator(NodeGroupCollection* nodeGroups,
common::node_group_idx_t nodeGroupIdx, common::row_idx_t startRow, common::row_idx_t numRows,
transaction_t commitTS)
: ChunkedGroupUndoIterator(nodeGroups, startRow, numRows, commitTS),
: VersionRecordHandler(nodeGroups, startRow, numRows, commitTS),
nodeGroup(nodeGroups->getNodeGroupNoLock(nodeGroupIdx)),
numRowsToRollback(std::min(numRows, nodeGroup->getNumRows() - startRow)) {
KU_ASSERT(startRow <= nodeGroup->getNumRows());
}

void NodeGroup::ChunkedGroupIterator::iterate(chunked_group_undo_op_t undoFunc) {
void NodeGroup::ChunkedGroupIterator::applyFuncToChunkedGroups(version_record_handler_op_t func) {
auto lock = nodeGroup->chunkedGroups.lock();
const auto [chunkedGroupIdx, startRowInChunkedGroup] =
nodeGroup->findChunkedGroupIdxFromRowIdxNoLock(startRow);
@@ -44,7 +44,7 @@ void NodeGroup::ChunkedGroupIterator::iterate(chunked_group_undo_op_t undoFunc)
auto* chunkedGroup = nodeGroup->chunkedGroups.getGroup(lock, curChunkedGroupIdx);
const auto numRowsForGroup =
std::min(numRowsLeft, chunkedGroup->getNumRows() - curStartRowIdxInChunk);
std::invoke(undoFunc, *chunkedGroup, curStartRowIdxInChunk, numRowsForGroup, commitTS);
std::invoke(func, *chunkedGroup, curStartRowIdxInChunk, numRowsForGroup, commitTS);

++curChunkedGroupIdx;
numRowsLeft -= numRowsForGroup;
18 changes: 10 additions & 8 deletions src/storage/store/node_group_collection.cpp
Original file line number Diff line number Diff line change
@@ -14,9 +14,9 @@ namespace storage {

NodeGroupCollection::NodeGroupCollection(MemoryManager& memoryManager,
const std::vector<LogicalType>& types, const bool enableCompression, FileHandle* dataFH,
Deserializer* deSer, const chunked_group_iterator_construct_t* iteratorConstructFunc)
Deserializer* deSer, const VersionRecordHandlerData* versionRecordHandlerData)
: enableCompression{enableCompression}, numTotalRows{0}, types{LogicalType::copy(types)},
dataFH{dataFH}, iteratorConstructFunc(iteratorConstructFunc) {
dataFH{dataFH}, versionRecordHandlerData(versionRecordHandlerData) {
if (deSer) {
deserialize(*deSer, memoryManager);
}
@@ -155,7 +155,7 @@ row_idx_t NodeGroupCollection::getNumTotalRows() {

NodeGroup* NodeGroupCollection::getOrCreateNodeGroup(transaction::Transaction* transaction,
node_group_idx_t groupIdx, NodeGroupDataFormat format,
const chunked_group_iterator_construct_t* constructIteratorFunc_) {
const VersionRecordHandlerData* versionRecordHandlerData) {
const auto lock = nodeGroups.lock();
while (groupIdx >= nodeGroups.getNumGroups(lock)) {
const auto currentGroupIdx = nodeGroups.getNumGroups(lock);
@@ -166,7 +166,7 @@ NodeGroup* NodeGroupCollection::getOrCreateNodeGroup(transaction::Transaction* t
enableCompression, LogicalType::copy(types)));
// push an insert of size 0 so that we can rollback the creation of this node group if
// needed
pushInsertInfo(transaction, nodeGroups.getLastGroup(lock), 0, constructIteratorFunc_);
pushInsertInfo(transaction, nodeGroups.getLastGroup(lock), 0, versionRecordHandlerData);
}
KU_ASSERT(groupIdx < nodeGroups.getNumGroups(lock));
return nodeGroups.getGroup(lock, groupIdx);
@@ -213,17 +213,19 @@ void NodeGroupCollection::rollbackInsert(common::row_idx_t numRows_, bool update

void NodeGroupCollection::pushInsertInfo(const transaction::Transaction* transaction,
NodeGroup* nodeGroup, common::row_idx_t numRows,
const chunked_group_iterator_construct_t* constructIteratorOverrideFunc) {
const VersionRecordHandlerData* overridedVersionRecordHandlerData) {
pushInsertInfo(transaction, nodeGroup->getNodeGroupIdx(), nodeGroup->getNumRows(), numRows,
constructIteratorOverrideFunc ? constructIteratorOverrideFunc : iteratorConstructFunc);
overridedVersionRecordHandlerData ? overridedVersionRecordHandlerData :
versionRecordHandlerData);
};

void NodeGroupCollection::pushInsertInfo(const transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::row_idx_t startRow, common::row_idx_t numRows,
const chunked_group_iterator_construct_t* constructIteratorFunc_) {
const VersionRecordHandlerData* overridedVersionRecordHandlerData) {
// we only append to the undo buffer if the node group collection is persistent
if (dataFH && transaction->shouldAppendToUndoBuffer()) {
transaction->pushInsertInfo(nodeGroupIdx, startRow, numRows, constructIteratorFunc_);
transaction->pushInsertInfo(nodeGroupIdx, startRow, numRows,
overridedVersionRecordHandlerData);
}
}

Loading