Skip to content

Commit

Permalink
Clean up node group
Browse files Browse the repository at this point in the history
  • Loading branch information
royi-luo committed Nov 15, 2024
1 parent faa989d commit 4d09b6d
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 138 deletions.
6 changes: 4 additions & 2 deletions src/include/storage/store/chunked_node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,12 @@ class ChunkedNodeGroup {

void commitInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS);
void rollbackInsert(common::row_idx_t startRow, common::row_idx_t numRows_);
void rollbackInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS);
void commitDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS);
void rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_);
void rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS);

uint64_t getEstimatedMemoryUsage() const;

Expand Down
9 changes: 4 additions & 5 deletions src/include/storage/store/csr_node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,11 @@ class CSRNodeGroup final : public NodeGroup {

void serialize(common::Serializer& serializer) override;

void commitInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS, CSRNodeGroupScanSource source) override;
void commitDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS, CSRNodeGroupScanSource source) override;

private:
std::pair<common::idx_t, common::row_idx_t> actionOnChunkedGroups(const common::UniqLock& lock,
common::row_idx_t startRow, common::row_idx_t numRows_, common::transaction_t commitTS,
CSRNodeGroupScanSource source, chunked_group_transaction_operation_t operation) override;

void initScanForCommittedPersistent(const transaction::Transaction* transaction,
RelTableScanState& relScanState, CSRNodeGroupScanState& nodeGroupScanState) const;
void initScanForCommittedInMem(RelTableScanState& relScanState,
Expand Down
25 changes: 19 additions & 6 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,15 @@ class NodeGroup {

void flush(transaction::Transaction* transaction, FileHandle& dataFH);

virtual void commitInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
void commitInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS, CSRNodeGroupScanSource source);
virtual void commitDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
void commitDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS, CSRNodeGroupScanSource source);

void rollbackInsert(common::row_idx_t startRow, common::row_idx_t numRows_);
void rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_);
void rollbackInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
CSRNodeGroupScanSource source);
void rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
CSRNodeGroupScanSource source);

virtual void checkpoint(MemoryManager& memoryManager, NodeGroupCheckpointState& state);

Expand Down Expand Up @@ -195,9 +197,20 @@ class NodeGroup {

common::node_group_idx_t getNodeGroupIdx() const { return nodeGroupIdx; }

protected:
static constexpr auto INVALID_CHUNKED_GROUP_IDX = UINT32_MAX;
static constexpr auto INVALID_START_ROW_IDX = UINT64_MAX;

using chunked_group_transaction_operation_t = void (
ChunkedNodeGroup::*)(common::row_idx_t, common::row_idx_t, common::transaction_t);
virtual std::pair<common::idx_t, common::row_idx_t> actionOnChunkedGroups(
const common::UniqLock& lock, common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS, CSRNodeGroupScanSource source,
chunked_group_transaction_operation_t operation);

private:
std::pair<common::idx_t, common::row_idx_t> findChunkedGroupIdxFromRowIdx(
const common::UniqLock& lock, common::row_idx_t rowIdx);
std::pair<common::idx_t, common::row_idx_t> findChunkedGroupIdxFromRowIdxNoLock(
common::row_idx_t rowIdx);
ChunkedNodeGroup* findChunkedGroupFromRowIdx(const common::UniqLock& lock,
common::row_idx_t rowIdx);
ChunkedNodeGroup* findChunkedGroupFromRowIdxNoLock(common::row_idx_t rowIdx);
Expand Down
6 changes: 4 additions & 2 deletions src/include/storage/store/node_group_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ class NodeGroupCollection {
CSRNodeGroupScanSource source = CSRNodeGroupScanSource::NONE);

void rollbackInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx);
common::node_group_idx_t nodeGroupIdx,
CSRNodeGroupScanSource source = CSRNodeGroupScanSource::NONE);
void rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx);
common::node_group_idx_t nodeGroupIdx,
CSRNodeGroupScanSource source = CSRNodeGroupScanSource::NONE);

void clear() {
const auto lock = nodeGroups.lock();
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ class RelTableData {
void pushInsertInfo(transaction::Transaction* transaction, const CSRNodeGroup& nodeGroup,
common::row_idx_t numRows_, CSRNodeGroupScanSource source);
void rollbackInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx);
common::node_group_idx_t nodeGroupIdx, CSRNodeGroupScanSource source);
void rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::node_group_idx_t nodeGroupIdx);
common::node_group_idx_t nodeGroupIdx, CSRNodeGroupScanSource source);
void commitInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
CSRNodeGroupScanSource source, common::node_group_idx_t nodeGroupIdx,
common::transaction_t commitTS);
Expand Down
6 changes: 4 additions & 2 deletions src/storage/store/chunked_node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ bool ChunkedNodeGroup::hasUpdates() const {
return false;
}

void ChunkedNodeGroup::rollbackInsert(common::row_idx_t startRow, common::row_idx_t numRows_) {
void ChunkedNodeGroup::rollbackInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t) {
if (startRow == 0) {
setNumRows(0);
versionInfo.reset();
Expand All @@ -453,7 +454,8 @@ void ChunkedNodeGroup::commitDelete(row_idx_t startRow, row_idx_t numRows_,
versionInfo->commitDelete(startRow, numRows_, commitTS);
}

void ChunkedNodeGroup::rollbackDelete(row_idx_t startRow, row_idx_t numRows_) {
void ChunkedNodeGroup::rollbackDelete(row_idx_t startRow, row_idx_t numRows_,
common::transaction_t) {
versionInfo->rollbackDelete(startRow, numRows_);
}

Expand Down
24 changes: 10 additions & 14 deletions src/storage/store/csr_node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -951,23 +951,19 @@ void CSRNodeGroup::finalizeCheckpoint(const UniqLock& lock) {
csrIndex.reset();
}

void CSRNodeGroup::commitInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS, CSRNodeGroupScanSource source) {
std::pair<idx_t, row_idx_t> CSRNodeGroup::actionOnChunkedGroups(const common::UniqLock& lock,
common::row_idx_t startRow, common::row_idx_t numRows_, common::transaction_t commitTS,
CSRNodeGroupScanSource source, chunked_group_transaction_operation_t operation) {
if (source == CSRNodeGroupScanSource::COMMITTED_PERSISTENT) {
persistentChunkGroup->commitInsert(startRow, numRows_, commitTS);
} else {
KU_ASSERT(source == CSRNodeGroupScanSource::COMMITTED_IN_MEMORY);
NodeGroup::commitInsert(startRow, numRows_, commitTS, source);
}
}

void CSRNodeGroup::commitDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
common::transaction_t commitTS, CSRNodeGroupScanSource source) {
if (source == CSRNodeGroupScanSource::COMMITTED_PERSISTENT) {
persistentChunkGroup->commitDelete(startRow, numRows_, commitTS);
KU_ASSERT(persistentChunkGroup || (numRows_ == 0));
if (persistentChunkGroup) {
std::invoke(operation, *persistentChunkGroup, startRow, numRows_, commitTS);
}
return {UINT32_MAX, UINT32_MAX};
} else {
KU_ASSERT(source == CSRNodeGroupScanSource::COMMITTED_IN_MEMORY);
NodeGroup::commitDelete(startRow, numRows_, commitTS, source);
return NodeGroup::actionOnChunkedGroups(lock, startRow, numRows_, commitTS, source,
operation);
}
}

Expand Down
135 changes: 57 additions & 78 deletions src/storage/store/node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,82 +282,69 @@ void NodeGroup::flush(Transaction* transaction, FileHandle& dataFH) {
chunkedGroups.resize(lock, 1);
}

void NodeGroup::rollbackInsert(common::row_idx_t startRow, common::row_idx_t numRows_) {
const auto lock = chunkedGroups.lock();
std::pair<idx_t, row_idx_t> NodeGroup::actionOnChunkedGroups(const common::UniqLock& lock,
common::row_idx_t startRow, common::row_idx_t numRows_, common::transaction_t commitTS,
CSRNodeGroupScanSource, chunked_group_transaction_operation_t operation) {
const auto [startChunkedGroupIdx, startRowIdxInChunk] =
findChunkedGroupIdxFromRowIdx(lock, startRow);
if (startChunkedGroupIdx == UINT32_MAX) {
return;
findChunkedGroupIdxFromRowIdxNoLock(startRow);
if (startChunkedGroupIdx != INVALID_CHUNKED_GROUP_IDX) {
auto curChunkedGroupIdx = startChunkedGroupIdx;
auto curStartRowIdxInChunk = startRowIdxInChunk;

auto numRowsLeft = numRows_;
while (numRowsLeft > 0 && curChunkedGroupIdx < chunkedGroups.getNumGroups(lock)) {
auto* chunkedGroup = chunkedGroups.getGroup(lock, curChunkedGroupIdx);
const auto numRowsForGroup =
std::min(numRowsLeft, chunkedGroup->getNumRows() - curStartRowIdxInChunk);
std::invoke(operation, *chunkedGroup, curStartRowIdxInChunk, numRowsForGroup, commitTS);

++curChunkedGroupIdx;
numRowsLeft -= numRowsForGroup;
curStartRowIdxInChunk = 0;
}
}
const auto numChunkedGroups = chunkedGroups.getNumGroups(lock);
KU_ASSERT(startChunkedGroupIdx < numChunkedGroups);
const bool shouldRemoveStartChunk = (startRowIdxInChunk == 0);
const auto numChunksToRemove =
numChunkedGroups - startChunkedGroupIdx - (shouldRemoveStartChunk ? 0 : 1);

for (common::node_group_idx_t i = numChunkedGroups - numChunksToRemove; i < numChunkedGroups;
++i) {
auto* startChunkedGroup = chunkedGroups.getGroup(lock, i);
startChunkedGroup->rollbackInsert(0, startChunkedGroup->getNumRows());
}
return {startChunkedGroupIdx, startRowIdxInChunk};
}

static constexpr common::transaction_t UNUSED_COMMIT_TS = INVALID_TRANSACTION;

chunkedGroups.removeTrailingGroups(lock, numChunksToRemove);
void NodeGroup::rollbackInsert(common::row_idx_t startRow, common::row_idx_t numRows_,
CSRNodeGroupScanSource source) {
const auto lock = chunkedGroups.lock();
const auto [startChunkedGroupIdx, startRowIdxInChunk] = actionOnChunkedGroups(lock, startRow,
numRows_, UNUSED_COMMIT_TS, source, &ChunkedNodeGroup::rollbackInsert);
if (startChunkedGroupIdx != INVALID_CHUNKED_GROUP_IDX) {
const auto numChunkedGroups = chunkedGroups.getNumGroups(lock);
KU_ASSERT(startChunkedGroupIdx < numChunkedGroups);
const bool shouldRemoveStartChunk = (startRowIdxInChunk == 0);
const auto numChunksToRemove =
numChunkedGroups - startChunkedGroupIdx - (shouldRemoveStartChunk ? 0 : 1);
chunkedGroups.removeTrailingGroups(lock, numChunksToRemove);

if (!shouldRemoveStartChunk) {
auto* startChunkedGroup = chunkedGroups.getGroup(lock, startChunkedGroupIdx);
startChunkedGroup->rollbackInsert(startRowIdxInChunk,
std::min(numRows_, startChunkedGroup->getNumRows() - startRowIdxInChunk));
numRows = startRow;
}
numRows = startRow;
}

void NodeGroup::rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_) {
KU_ASSERT(numRows_ == 1);
void NodeGroup::rollbackDelete(common::row_idx_t startRow, common::row_idx_t numRows_,
CSRNodeGroupScanSource source) {
const auto lock = chunkedGroups.lock();
const auto [startChunkedGroupIdx, startRowIdxInChunk] =
findChunkedGroupIdxFromRowIdx(lock, startRow);
if (startChunkedGroupIdx == UINT32_MAX) {
return;
}
KU_ASSERT(startChunkedGroupIdx < chunkedGroups.getNumGroups(lock));
auto* startChunkedGroup = chunkedGroups.getGroup(lock, startChunkedGroupIdx);
startChunkedGroup->rollbackDelete(startRowIdxInChunk, numRows_);
actionOnChunkedGroups(lock, startRow, numRows_, UNUSED_COMMIT_TS, source,
&ChunkedNodeGroup::rollbackDelete);
}

void NodeGroup::commitInsert(row_idx_t startRow, row_idx_t numRows_, common::transaction_t commitTS,
CSRNodeGroupScanSource) {
CSRNodeGroupScanSource source) {
const auto lock = chunkedGroups.lock();
auto [startChunkedGroupIdx, startRowIdxInChunk] = findChunkedGroupIdxFromRowIdx(lock, startRow);

auto numRowsLeft = numRows_;
while (numRowsLeft > 0 && startChunkedGroupIdx < chunkedGroups.getNumGroups(lock)) {
auto* nodeGroup = chunkedGroups.getGroup(lock, startChunkedGroupIdx);
const auto numRowsForGroup =
std::min(numRowsLeft, nodeGroup->getNumRows() - startRowIdxInChunk);
nodeGroup->commitInsert(startRowIdxInChunk, numRowsForGroup, commitTS);

++startChunkedGroupIdx;
numRowsLeft -= numRowsForGroup;
startRowIdxInChunk = 0;
}
actionOnChunkedGroups(lock, startRow, numRows_, commitTS, source,
&ChunkedNodeGroup::commitInsert);
}

void NodeGroup::commitDelete(row_idx_t startRow, row_idx_t numRows_, common::transaction_t commitTS,
CSRNodeGroupScanSource) {
CSRNodeGroupScanSource source) {
const auto lock = chunkedGroups.lock();
auto [startChunkedGroupIdx, startRowIdxInChunk] = findChunkedGroupIdxFromRowIdx(lock, startRow);

auto numRowsLeft = numRows_;
while (numRowsLeft > 0 && startChunkedGroupIdx < chunkedGroups.getNumGroups(lock)) {
auto* nodeGroup = chunkedGroups.getGroup(lock, startChunkedGroupIdx);
const auto numRowsForGroup =
std::min(numRowsLeft, nodeGroup->getNumRows() - startRowIdxInChunk);
nodeGroup->commitDelete(startRowIdxInChunk, numRowsForGroup, commitTS);

++startChunkedGroupIdx;
numRowsLeft -= numRowsForGroup;
startRowIdxInChunk = 0;
}
actionOnChunkedGroups(lock, startRow, numRows_, commitTS, source,
&ChunkedNodeGroup::commitDelete);
}

void NodeGroup::checkpoint(MemoryManager& memoryManager, NodeGroupCheckpointState& state) {
Expand Down Expand Up @@ -534,44 +521,36 @@ std::unique_ptr<NodeGroup> NodeGroup::deserialize(MemoryManager& memoryManager,
}
}

std::pair<idx_t, row_idx_t> NodeGroup::findChunkedGroupIdxFromRowIdx(const UniqLock& lock,
row_idx_t rowIdx) {
if (chunkedGroups.isEmpty(lock)) {
return {UINT32_MAX, UINT64_MAX};
std::pair<idx_t, row_idx_t> NodeGroup::findChunkedGroupIdxFromRowIdxNoLock(row_idx_t rowIdx) {
if (chunkedGroups.getNumGroupsNoLock() == 0) {
return {INVALID_CHUNKED_GROUP_IDX, INVALID_START_ROW_IDX};

Check warning on line 526 in src/storage/store/node_group.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/store/node_group.cpp#L526

Added line #L526 was not covered by tests
}
const auto numRowsInFirstGroup = chunkedGroups.getFirstGroup(lock)->getNumRows();
const auto numRowsInFirstGroup = chunkedGroups.getFirstGroupNoLock()->getNumRows();
if (rowIdx < numRowsInFirstGroup) {
return {0, rowIdx};
}
rowIdx -= numRowsInFirstGroup;
const auto chunkedGroupIdx = rowIdx / ChunkedNodeGroup::CHUNK_CAPACITY + 1;
const auto rowIdxInChunk = rowIdx % ChunkedNodeGroup::CHUNK_CAPACITY;
if (chunkedGroupIdx >= chunkedGroups.getNumGroups(lock)) {
return {UINT32_MAX, UINT64_MAX};
if (chunkedGroupIdx >= chunkedGroups.getNumGroupsNoLock()) {
return {INVALID_CHUNKED_GROUP_IDX, INVALID_START_ROW_IDX};
}
return {chunkedGroupIdx, rowIdxInChunk};
}

ChunkedNodeGroup* NodeGroup::findChunkedGroupFromRowIdx(const UniqLock& lock, row_idx_t rowIdx) {
const auto [chunkedGroupIdx, rowIdxInChunkedGroup] =
findChunkedGroupIdxFromRowIdx(lock, rowIdx);
if (chunkedGroupIdx == UINT32_MAX) {
findChunkedGroupIdxFromRowIdxNoLock(rowIdx);
if (chunkedGroupIdx == INVALID_CHUNKED_GROUP_IDX) {
return nullptr;
}
return chunkedGroups.getGroup(lock, chunkedGroupIdx);
}

ChunkedNodeGroup* NodeGroup::findChunkedGroupFromRowIdxNoLock(row_idx_t rowIdx) {
if (chunkedGroups.getNumGroupsNoLock() == 0) {
return nullptr;
}
const auto numRowsInFirstGroup = chunkedGroups.getFirstGroupNoLock()->getNumRows();
if (rowIdx < numRowsInFirstGroup) {
return chunkedGroups.getFirstGroupNoLock();
}
rowIdx -= numRowsInFirstGroup;
const auto chunkedGroupIdx = rowIdx / ChunkedNodeGroup::CHUNK_CAPACITY + 1;
if (chunkedGroupIdx >= chunkedGroups.getNumGroupsNoLock()) {
const auto [chunkedGroupIdx, rowIdxInChunkedGroup] =
findChunkedGroupIdxFromRowIdxNoLock(rowIdx);
if (chunkedGroupIdx == INVALID_CHUNKED_GROUP_IDX) {
return nullptr;
}
return chunkedGroups.getGroupNoLock(chunkedGroupIdx);
Expand Down
Loading

0 comments on commit 4d09b6d

Please sign in to comment.