Skip to content

Commit

Permalink
Make undo buffer delete info also operate at table level
Browse files Browse the repository at this point in the history
  • Loading branch information
royi-luo committed Nov 13, 2024
1 parent 30bb4ca commit 2f3f293
Show file tree
Hide file tree
Showing 19 changed files with 171 additions and 40 deletions.
3 changes: 3 additions & 0 deletions src/include/storage/store/csr_node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ class CSRNodeGroup final : public NodeGroup {

void serialize(common::Serializer& serializer) override;

void commitDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS) override;

private:
void initScanForCommittedPersistent(transaction::Transaction* transaction,
RelTableScanState& relScanState, CSRNodeGroupScanState& nodeGroupScanState) const;
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,11 @@ class NodeGroup {

void commitInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS);
virtual void commitDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS);

void rollbackInsert(common::row_idx_t startRow, common::row_idx_t numRows_);
void rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_);

virtual void checkpoint(MemoryManager& memoryManager, NodeGroupCheckpointState& state);

Expand Down
4 changes: 4 additions & 0 deletions src/include/storage/store/node_group_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ class NodeGroupCollection {

void commitInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx, common::transaction_t commitTS);
void commitDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx, common::transaction_t commitTS);

void rollbackInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx);
void rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx);

void clear() {
const auto lock = nodeGroups.lock();
Expand Down
4 changes: 4 additions & 0 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,12 @@ class NodeTable final : public Table {

void rollbackInsert(transaction::Transaction* transaction, common::row_idx_t startRow,
common::row_idx_t numRows_, common::node_group_idx_t nodeGroupIdx);
void rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx);
void commitInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx, common::transaction_t commitTS);
void commitDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx, common::transaction_t commitTS);

common::node_group_idx_t getNumCommittedNodeGroups() const {
return nodeGroups->getNumNodeGroups();
Expand Down
6 changes: 3 additions & 3 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ class RelTable final : public Table {
static common::offset_t getCommittedOffset(common::offset_t uncommittedOffset,
common::offset_t maxCommittedOffset);

void detachDeleteForCSRRels(transaction::Transaction* transaction,
const RelTableData* tableData, const RelTableData* reverseTableData,
RelTableScanState* relDataReadState, RelTableDeleteState* deleteState);
void detachDeleteForCSRRels(transaction::Transaction* transaction, RelTableData* tableData,
RelTableData* reverseTableData, RelTableScanState* relDataReadState,
RelTableDeleteState* deleteState);

void checkRelMultiplicityConstraint(transaction::Transaction* transaction,
const TableInsertState& state) const;
Expand Down
6 changes: 5 additions & 1 deletion src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class RelTableData {
const common::ValueVector& relIDVector, common::column_id_t columnID,
const common::ValueVector& dataVector) const;
bool delete_(transaction::Transaction* transaction, common::ValueVector& boundNodeIDVector,
const common::ValueVector& relIDVector) const;
const common::ValueVector& relIDVector);
void addColumn(transaction::Transaction* transaction, TableAddColumnState& addColumnState);

bool checkIfNodeHasRels(transaction::Transaction* transaction,
Expand Down Expand Up @@ -67,8 +67,12 @@ class RelTableData {
common::row_idx_t numRows_);
void rollbackInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx);
void rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx);
void commitInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx, common::transaction_t commitTS);
void commitDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx, common::transaction_t commitTS);

void serialize(common::Serializer& serializer) const;

Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/store/version_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ class VersionInfo {

void append(const transaction::Transaction* transaction, common::row_idx_t startRow,
common::row_idx_t numRows);
bool delete_(const transaction::Transaction* transaction, ChunkedNodeGroup* chunkedNodeGroup,
common::row_idx_t rowIdx);
bool delete_(const transaction::Transaction* transaction, common::row_idx_t rowIdx);

void getSelVectorToScan(common::transaction_t startTS, common::transaction_t transactionID,
common::SelectionVector& selVector, common::row_idx_t startRow,
Expand Down
6 changes: 4 additions & 2 deletions src/include/storage/undo_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ class UndoBuffer {
common::row_idx_t startRow, common::row_idx_t numRows);
void createInsertInfo(ChunkedNodeGroup* nodeGroup, common::row_idx_t startRow,
common::row_idx_t numRows);
void createDeleteInfo(ChunkedNodeGroup* nodeGroup, common::row_idx_t startRow,
common::row_idx_t numRows);
void createDeleteInfo(NodeTable* nodeTable, common::node_group_idx_t nodeGroupIdx,
common::row_idx_t startRow, common::row_idx_t numRows);
void createDeleteInfo(RelTableData* relTableData, common::node_group_idx_t nodeGroupIdx,
common::row_idx_t startRow, common::row_idx_t numRows);
void createVectorUpdateInfo(UpdateInfo* updateInfo, common::idx_t vectorIdx,
VectorUpdateInfo* vectorUpdateInfo);

Expand Down
6 changes: 4 additions & 2 deletions src/include/transaction/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ class Transaction {
common::row_idx_t startRow, common::row_idx_t numRows) const;
void pushInsertInfo(storage::ChunkedNodeGroup* nodeGroup, common::row_idx_t startRow,
common::row_idx_t numRows) const;
void pushDeleteInfo(storage::ChunkedNodeGroup* nodeGroup, common::row_idx_t startRow,
common::row_idx_t numRows) const;
void pushDeleteInfo(storage::RelTableData* relTableData, common::node_group_idx_t nodeGroupIdx,
common::row_idx_t startRow, common::row_idx_t numRows) const;
void pushDeleteInfo(storage::NodeTable* nodeTable, common::node_group_idx_t nodeGroupIdx,
common::row_idx_t startRow, common::row_idx_t numRows) const;
void pushVectorUpdateInfo(storage::UpdateInfo& updateInfo, common::idx_t vectorIdx,
storage::VectorUpdateInfo& vectorUpdateInfo) const;

Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/chunked_node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ bool ChunkedNodeGroup::delete_(const Transaction* transaction, row_idx_t rowIdxI
if (!versionInfo) {
versionInfo = std::make_unique<VersionInfo>();
}
return versionInfo->delete_(transaction, this, rowIdxInChunk);
return versionInfo->delete_(transaction, rowIdxInChunk);
}

void ChunkedNodeGroup::addColumn(Transaction* transaction,
Expand Down
13 changes: 13 additions & 0 deletions src/storage/store/csr_node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -949,5 +949,18 @@ void CSRNodeGroup::finalizeCheckpoint(const UniqLock& lock) {
csrIndex.reset();
}

void CSRNodeGroup::commitDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS) {
KU_ASSERT(numRows_ == 1);
if (persistentChunkGroup) {
if (startRow < persistentChunkGroup->getNumRows()) {
persistentChunkGroup->commitDelete(startRow, numRows_, commitTS);
return;
}
startRow -= persistentChunkGroup->getNumRows();
}
NodeGroup::commitDelete(startRow, numRows_, commitTS);
}

} // namespace storage
} // namespace kuzu
33 changes: 32 additions & 1 deletion src/storage/store/node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,19 @@ void NodeGroup::rollbackInsert(common::row_idx_t startRow, common::row_idx_t num
numRows = startRow;
}

void NodeGroup::rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_) {
KU_ASSERT(numRows_ == 1);
const auto lock = chunkedGroups.lock();
const auto [startChunkedGroupIdx, startRowIdxInChunk] =
findChunkedGroupIdxFromRowIdx(lock, startRow);
if (startChunkedGroupIdx == UINT32_MAX) {
return;
}
KU_ASSERT(startChunkedGroupIdx < chunkedGroups.getNumGroups(lock));
auto* startChunkedGroup = chunkedGroups.getGroup(lock, startChunkedGroupIdx);
startChunkedGroup->rollbackDelete(startRowIdxInChunk, numRows_);
}

void NodeGroup::commitInsert(row_idx_t startRow, row_idx_t numRows_,
common::transaction_t commitTS) {
const auto lock = chunkedGroups.lock();
Expand All @@ -329,6 +342,24 @@ void NodeGroup::commitInsert(row_idx_t startRow, row_idx_t numRows_,
}
}

void NodeGroup::commitDelete(row_idx_t startRow, row_idx_t numRows_,
common::transaction_t commitTS) {
const auto lock = chunkedGroups.lock();
auto [startChunkedGroupIdx, startRowIdxInChunk] = findChunkedGroupIdxFromRowIdx(lock, startRow);

auto numRowsLeft = numRows_;
while (numRowsLeft > 0 && startChunkedGroupIdx < chunkedGroups.getNumGroups(lock)) {
auto* nodeGroup = chunkedGroups.getGroup(lock, startChunkedGroupIdx);
const auto numRowsForGroup =
std::min(numRowsLeft, nodeGroup->getNumRows() - startRowIdxInChunk);
nodeGroup->commitDelete(startRowIdxInChunk, numRowsForGroup, commitTS);

++startChunkedGroupIdx;
numRowsLeft -= numRowsForGroup;
startRowIdxInChunk = 0;
}
}

void NodeGroup::checkpoint(MemoryManager& memoryManager, NodeGroupCheckpointState& state) {
// We don't need to consider deletions here, as they are flushed separately as metadata.
// TODO(Guodong): A special case can be all rows are deleted or rollbacked, then we can skip
Expand Down Expand Up @@ -421,7 +452,7 @@ std::unique_ptr<VersionInfo> NodeGroup::checkpointVersionInfo(const UniqLock& lo
// TODO(Guodong): Optimize the for loop here to directly acess the version info.
for (auto i = 0u; i < chunkedGroup->getNumRows(); i++) {
if (chunkedGroup->isDeleted(transaction, i)) {
checkpointVersionInfo->delete_(transaction, nullptr, currRow + i);
checkpointVersionInfo->delete_(transaction, currRow + i);
}
}
}
Expand Down
15 changes: 14 additions & 1 deletion src/storage/store/node_group_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ void NodeGroupCollection::rollbackInsert(common::row_idx_t startRow, common::row
if (startRow == 0) {
// if we are in the middle of rollback
// the index in the group collection of a node group may not match the nodeGroupIdx
// regardless this should be fast node groups should be inserted in close to increasing
// regardless this should be fast as node groups should be inserted in close to increasing
// order
const auto& nodeGroupVector = nodeGroups.getAllGroups(lock);
const auto idxToRemove = safeIntegerConversion<idx_t>(
Expand All @@ -218,12 +218,25 @@ void NodeGroupCollection::rollbackInsert(common::row_idx_t startRow, common::row
}
}

void NodeGroupCollection::rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx) {
const auto lock = nodeGroups.lock();
KU_ASSERT(nodeGroupIdx < nodeGroups.getNumGroups(lock));
nodeGroups.getGroup(lock, nodeGroupIdx)->rollbackDelete(startRow, numRows_);
}

void NodeGroupCollection::commitInsert(row_idx_t startRow, row_idx_t numRows_,
node_group_idx_t nodeGroupIdx, common::transaction_t commitTS) {
const auto lock = nodeGroups.lock();
nodeGroups.getGroup(lock, nodeGroupIdx)->commitInsert(startRow, numRows_, commitTS);
}

void NodeGroupCollection::commitDelete(row_idx_t startRow, row_idx_t numRows_,
node_group_idx_t nodeGroupIdx, common::transaction_t commitTS) {
const auto lock = nodeGroups.lock();
nodeGroups.getGroup(lock, nodeGroupIdx)->commitDelete(startRow, numRows_, commitTS);
}

void NodeGroupCollection::serialize(Serializer& ser) {
ser.writeDebuggingInfo("node_groups");
nodeGroups.serializeGroups(ser);
Expand Down
20 changes: 19 additions & 1 deletion src/storage/store/node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,9 @@ bool NodeTable::delete_(Transaction* transaction, TableDeleteState& deleteState)
const auto rowIdxInGroup =
nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
isDeleted = nodeGroups->getNodeGroup(nodeGroupIdx)->delete_(transaction, rowIdxInGroup);
if (transaction->shouldAppendToUndoBuffer()) {
transaction->pushDeleteInfo(this, nodeGroupIdx, rowIdxInGroup, 1);
}
}
if (isDeleted) {
hasChanges = true;
Expand Down Expand Up @@ -492,7 +495,12 @@ void NodeTable::commit(Transaction* transaction, LocalTable* localTable) {
const auto rowIdxInGroup =
startNodeOffset + nodeOffset -
StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
nodeGroups->getNodeGroup(nodeGroupIdx)->delete_(transaction, rowIdxInGroup);
[[maybe_unused]] const bool isDeleted =
nodeGroups->getNodeGroup(nodeGroupIdx)->delete_(transaction, rowIdxInGroup);
KU_ASSERT(isDeleted);
if (transaction->shouldAppendToUndoBuffer()) {
transaction->pushDeleteInfo(this, nodeGroupIdx, rowIdxInGroup, 1);
}
}
}
}
Expand Down Expand Up @@ -561,11 +569,21 @@ void NodeTable::rollbackInsert(transaction::Transaction* transaction, common::ro
nodeGroups->rollbackInsert(startRow, numRows_, nodeGroupIdx);
}

void NodeTable::rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx) {
nodeGroups->rollbackDelete(startRow, numRows_, nodeGroupIdx);
}

void NodeTable::commitInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx, common::transaction_t commitTS) {
nodeGroups->commitInsert(startRow, numRows_, nodeGroupIdx, commitTS);
}

void NodeTable::commitDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx, common::transaction_t commitTS) {
nodeGroups->commitDelete(startRow, numRows_, nodeGroupIdx, commitTS);
}

TableStats NodeTable::getStats(const Transaction* transaction) const {
auto stats = nodeGroups->getStats();
const auto localTable = transaction->getLocalStorage()->getLocalTable(tableID,
Expand Down
4 changes: 2 additions & 2 deletions src/storage/store/rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ void RelTable::throwIfNodeHasRels(Transaction* transaction, RelDataDirection dir
}
}

void RelTable::detachDeleteForCSRRels(Transaction* transaction, const RelTableData* tableData,
const RelTableData* reverseTableData, RelTableScanState* relDataReadState,
void RelTable::detachDeleteForCSRRels(Transaction* transaction, RelTableData* tableData,
RelTableData* reverseTableData, RelTableScanState* relDataReadState,
RelTableDeleteState* deleteState) {
const auto localTable = transaction->getLocalStorage()->getLocalTable(tableID,
LocalStorage::NotExistAction::RETURN_NULL);
Expand Down
18 changes: 16 additions & 2 deletions src/storage/store/rel_table_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ bool RelTableData::update(Transaction* transaction, ValueVector& boundNodeIDVect
}

bool RelTableData::delete_(Transaction* transaction, ValueVector& boundNodeIDVector,
const ValueVector& relIDVector) const {
const ValueVector& relIDVector) {
const auto boundNodePos = boundNodeIDVector.state->getSelVector()[0];
const auto relIDPos = relIDVector.state->getSelVector()[0];
if (boundNodeIDVector.isNull(boundNodePos) || relIDVector.isNull(relIDPos)) {
Expand All @@ -96,7 +96,11 @@ bool RelTableData::delete_(Transaction* transaction, ValueVector& boundNodeIDVec
const auto boundNodeOffset = boundNodeIDVector.getValue<nodeID_t>(boundNodePos).offset;
const auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(boundNodeOffset);
auto& csrNodeGroup = getNodeGroup(nodeGroupIdx)->cast<CSRNodeGroup>();
return csrNodeGroup.delete_(transaction, source, rowIdx);
bool isDeleted = csrNodeGroup.delete_(transaction, source, rowIdx);
if (isDeleted && transaction->shouldAppendToUndoBuffer()) {
transaction->pushDeleteInfo(this, nodeGroupIdx, rowIdx, 1);
}
return isDeleted;
}

void RelTableData::addColumn(Transaction* transaction, TableAddColumnState& addColumnState) {
Expand Down Expand Up @@ -217,11 +221,21 @@ void RelTableData::rollbackInsert(common::row_idx_t startRow, common::row_idx_t
nodeGroups->rollbackInsert(startRow, numRows_, nodeGroupIdx);
}

void RelTableData::rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx) {
nodeGroups->rollbackDelete(startRow, numRows_, nodeGroupIdx);
}

void RelTableData::commitInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx, common::transaction_t commitTS) {
nodeGroups->commitInsert(startRow, numRows_, nodeGroupIdx, commitTS);
}

void RelTableData::commitDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx, common::transaction_t commitTS) {
nodeGroups->commitDelete(startRow, numRows_, nodeGroupIdx, commitTS);
}

void RelTableData::serialize(Serializer& serializer) const {
nodeGroups->serialize(serializer);
}
Expand Down
9 changes: 2 additions & 7 deletions src/storage/store/version_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,7 @@ void VersionInfo::append(const transaction::Transaction* transaction, const row_
}
}

bool VersionInfo::delete_(const transaction::Transaction* transaction,
ChunkedNodeGroup* chunkedNodeGroup, const row_idx_t rowIdx) {
bool VersionInfo::delete_(const transaction::Transaction* transaction, const row_idx_t rowIdx) {
auto [vectorIdx, rowIdxInVector] =
StorageUtils::getQuotientRemainder(rowIdx, DEFAULT_VECTOR_CAPACITY);
auto& vectorVersionInfo = getOrCreateVersionInfo(vectorIdx);
Expand All @@ -377,11 +376,7 @@ bool VersionInfo::delete_(const transaction::Transaction* transaction,
// ALWAYS_INSERTED to avoid checking the version in the future.
vectorVersionInfo.insertionStatus = VectorVersionInfo::InsertionStatus::ALWAYS_INSERTED;
}
const auto deleted = vectorVersionInfo.delete_(transaction->getID(), rowIdxInVector);
if (deleted && transaction->shouldAppendToUndoBuffer()) {
transaction->pushDeleteInfo(chunkedNodeGroup, rowIdx, 1);
}
return deleted;
return vectorVersionInfo.delete_(transaction->getID(), rowIdxInVector);
}

void VersionInfo::getSelVectorToScan(const transaction_t startTS, const transaction_t transactionID,
Expand Down
Loading

0 comments on commit 2f3f293

Please sign in to comment.