Skip to content

Commit

Permalink
Fix chaindelegators save/load after 24.2->24.3 migration
Browse files Browse the repository at this point in the history
continue

continue
  • Loading branch information
robdrynkin committed Feb 10, 2025
1 parent 029c82b commit ba1fb68
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 47 deletions.
68 changes: 27 additions & 41 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,10 @@ namespace NKikimr {
FreeSpace.clear();
ui32 slotsInChunk = 0;
::Load(s, slotsInChunk);
Y_VERIFY_S(slotsInChunk == SlotsInChunk, VDiskLogPrefix
<< "slotsInChunk# " << slotsInChunk << " SlotsInChunk# " << SlotsInChunk);

SlotsInChunk = slotsInChunk;
ConstMask = BuildConstMask(VDiskLogPrefix, SlotsInChunk);

::Load(s, AllocatedSlots);
::Load(s, FreeSpace);
FreeSlotsInFreeSpace = 0;
Expand Down Expand Up @@ -473,64 +475,47 @@ namespace NKikimr {
}

void TAllChains::Save(IOutputStream *s) const {
if (StartMode == EStartMode::Loaded) {
ui32 size = ChainDelegators.size();
::Save(s, size);
for (auto& d : ChainDelegators) {
::Save(s, d);
}
} else {
std::vector<const TChainDelegator*> delegators;
for (auto &x : ChainDelegators) {
if (!x.HaveBeenUsed() && x.SlotSize < FirstLoadedSlotSize) {
continue; // preserving backward compatibility until no allocations
}
delegators.emplace_back(&x);
}

ui32 size = delegators.size();
::Save(s, size);
for (auto x : delegators) {
::Save(s, *x);
}
ui32 size = ChainDelegators.size();
::Save(s, size);
Cerr << VDiskLogPrefix << "Saving all chains, size = " << size << Endl;
for (auto& d : ChainDelegators) {
Cerr << VDiskLogPrefix << "Chain slot size = " << d.SlotSize << Endl;
::Save(s, d);
}
Cerr << VDiskLogPrefix << VDiskLogPrefix << " Saved" << Endl;
}

void TAllChains::Load(IInputStream *s) {
ui32 size = 0;
// load array size
::Load(s, size);
if (size == ChainDelegators.size()) {
Cerr << VDiskLogPrefix << "Loading all chains (simple), size = " << size << Endl;
StartMode = EStartMode::Loaded;
// load map and current map are of the same size, just load it
for (auto &x : ChainDelegators) {
::Load(s, x);
}
} else if (size < ChainDelegators.size()) {
Cerr << VDiskLogPrefix << "Loading all chains (migration), size = " << size << Endl;

// map size has been changed, run migration
StartMode = EStartMode::Migrated;
TAllChainDelegators chainDelegators = BuildChains(OldMinHugeBlobSizeInBytes);
Y_VERIFY_S(size > 0 && size == chainDelegators.size(), "size# " << size
<< " chainDelegators.size()# " << chainDelegators.size());
StartMode = EStartMode::Loaded;
using TIt = TAllChainDelegators::iterator;

// load into temporary delegators
for (auto &x : chainDelegators) {
::Load(s, x);
}
for (ui32 i = 0; i < size; ++i) {
TChainDelegator c(VDiskLogPrefix, 1, 1, ChunkSize, AppendBlockSize);
::Load(s, c);

// migrate
using TIt = TAllChainDelegators::iterator;
TIt loadedIt = chainDelegators.begin();
TIt loadedEnd = chainDelegators.end();
FirstLoadedSlotSize = loadedIt->SlotSize;
for (TIt it = ChainDelegators.begin(); it != ChainDelegators.end(); ++it) {
Y_ABORT_UNLESS(loadedIt != loadedEnd);
if (loadedIt->SlotSize == it->SlotSize) {
*it = std::move(*loadedIt);
++loadedIt;
Cerr << VDiskLogPrefix << "Loading chain, SlotsInChunk = " << c.ChainPtr->SlotsInChunk << Endl;

for (TIt it = ChainDelegators.begin(); it != ChainDelegators.end(); ++it) {
if (it->SlotsInChunk == c.ChainPtr->SlotsInChunk) {
it->ChainPtr = std::move(c.ChainPtr);
break;
}
}
}
Y_ABORT_UNLESS(loadedIt == loadedEnd);
} else {
// entry point size rollback case
Y_ABORT_UNLESS(size > ChainDelegators.size());
Expand All @@ -540,6 +525,7 @@ namespace NKikimr {
<< " loadedSize# " << size
<< " curChainDelegatorsSize# " << curChainDelegatorsSize);
}
Cerr << VDiskLogPrefix << "Loaded" << Endl;
}

void TAllChains::GetOwnedChunks(TSet<TChunkIdx>& chunks) const {
Expand Down
11 changes: 6 additions & 5 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,16 @@ namespace NKikimr {
using TChunkID = ui32;
using TFreeSpace = TMap<TChunkID, TMask>;

public:
static constexpr ui32 MaxNumberOfSlots = 32768; // it's not a good idea to have more slots than this
const TString VDiskLogPrefix;
const ui32 SlotsInChunk;
const TMask ConstMask; // mask of 'all slots are free'
ui32 SlotsInChunk;
TMask ConstMask; // mask of 'all slots are free'
TFreeSpace FreeSpace;
TFreeSpace LockedChunks;
ui32 AllocatedSlots = 0;
ui32 FreeSlotsInFreeSpace = 0;

public:
static TMask BuildConstMask(const TString &prefix, ui32 slotsInChunk);

public:
Expand Down Expand Up @@ -205,6 +205,8 @@ namespace NKikimr {
// Builds a map of BlobSize -> THugeSlotsMap::TSlotInfo for THugeBlobCtx
std::shared_ptr<THugeSlotsMap> BuildHugeSlotsMap() const;

TAllChainDelegators ChainDelegators;

private:

TAllChainDelegators BuildChains(ui32 minHugeBlobInBytes) const;
Expand All @@ -228,8 +230,6 @@ namespace NKikimr {
const ui32 MaxBlobInBytes;
const ui32 Overhead;
EStartMode StartMode = EStartMode::Empty;
ui32 FirstLoadedSlotSize = 0;
TAllChainDelegators ChainDelegators;
TSearchTable SearchTable;
};

Expand All @@ -246,6 +246,7 @@ namespace NKikimr {
const TString VDiskLogPrefix;
const ui32 FreeChunksReservation;
TFreeChunks FreeChunks;
public:
TAllChains Chains;

public:
Expand Down
126 changes: 125 additions & 1 deletion ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


// change to Cerr if you want logging
#define STR Cnull
#define STR Cerr

namespace NKikimr {

Expand Down Expand Up @@ -485,6 +485,130 @@ namespace NKikimr {
Y_UNIT_TEST(RollbackFrom_New_To_Old) {
Write_SaveEntryPoint_Restart(EWrite_SaveEntryPoint_Restart::RollbackFrom_New_To_Old);
}

NPrivate::TChainLayoutBuilder GetChainLayoutBuilder(ui32 minHugeBlobInBytes) {
ui32 appendBlockSize = 4064;
ui32 milestoneBlobInBytes = 524288;
ui32 maxBlobInBytes = 10485760;
ui32 overhead = 8u;

const ui32 startBlocks = minHugeBlobInBytes / appendBlockSize;
const ui32 mileStoneBlocks = milestoneBlobInBytes / appendBlockSize;

ui32 endBlocks = maxBlobInBytes / appendBlockSize;
endBlocks += !(endBlocks * appendBlockSize == maxBlobInBytes);

STR << "startBlocks# " << startBlocks
<< " mileStoneBlocks# " << mileStoneBlocks
<< " endBlocks# " << endBlocks << "\n";

return NPrivate::TChainLayoutBuilder(startBlocks, mileStoneBlocks, endBlocks, overhead);
}

Y_UNIT_TEST(ChainLength_4064) {
auto builder = GetChainLayoutBuilder(4064);
UNIT_ASSERT_VALUES_EQUAL(builder.GetLayout().size(), 53);
}

Y_UNIT_TEST(ChainLength_97537) {
auto builder = GetChainLayoutBuilder(97537);
UNIT_ASSERT_VALUES_EQUAL(builder.GetLayout().size(), 39);
}

Y_UNIT_TEST(ChainLength_524257) {
auto builder = GetChainLayoutBuilder(524257);
UNIT_ASSERT_VALUES_EQUAL(builder.GetLayout().size(), 26);
}

Y_UNIT_TEST(FailingRecovery) {
ui32 chunkSize = 135249920;
ui32 appendBlockSize = 4064;
ui32 milestoneBlobInBytes = 524288;
ui32 maxBlobInBytes = 10485760;
ui32 overhead = 8u;

TString serialized;

{
ui32 oldMinHugeBlobSizeInBytes = 524288;
ui32 minHugeBlobInBytes = 524288; // 24.2
THeap heap("vdisk", chunkSize, appendBlockSize, minHugeBlobInBytes, oldMinHugeBlobSizeInBytes, milestoneBlobInBytes,
maxBlobInBytes, overhead, 0);

serialized = heap.Serialize();
}

{
ui32 oldMinHugeBlobSizeInBytes = 524288;
ui32 minHugeBlobInBytes = appendBlockSize; // update 24.2 -> 24.3
THeap heap("vdisk", chunkSize, appendBlockSize, minHugeBlobInBytes, oldMinHugeBlobSizeInBytes, milestoneBlobInBytes,
maxBlobInBytes, overhead, 0);

{
// emulate old behavior
TVector<TChainDelegator> delegators;
for (auto& d: heap.Chains.ChainDelegators) {
if (d.SlotSize >= 524288) {
delegators.emplace_back(std::move(d));
}
}
heap.Chains.ChainDelegators = std::move(delegators);
}

heap.ParseFromString(serialized);

serialized = heap.Serialize();
}

{
ui32 oldMinHugeBlobSizeInBytes = 524288;
ui32 minHugeBlobInBytes = appendBlockSize; // 24.3
THeap heap("vdisk", chunkSize, appendBlockSize, minHugeBlobInBytes, oldMinHugeBlobSizeInBytes, milestoneBlobInBytes,
maxBlobInBytes, overhead, 0);
heap.ParseFromString(serialized);

heap.AddChunk(1);
heap.AddChunk(2);
heap.AddChunk(3);
THugeSlot slot;
ui32 slotSize;
UNIT_ASSERT(heap.Allocate(97536 - 1, &slot, &slotSize));
UNIT_ASSERT(heap.Allocate(113792 - 1, &slot, &slotSize));
UNIT_ASSERT(heap.Allocate(170688 - 1, &slot, &slotSize));

{
// emulate old behavior
TVector<TChainDelegator> delegators;
for (auto& d: heap.Chains.ChainDelegators) {
if (d.SlotSize >= 524288 || d.SlotSize == 97536 || d.SlotSize == 113792 || d.SlotSize == 170688) {
delegators.emplace_back(std::move(d));
}
}
heap.Chains.ChainDelegators = std::move(delegators);
}

serialized = heap.Serialize();
}

{
ui32 oldMinHugeBlobSizeInBytes = 524288;
ui32 minHugeBlobInBytes = appendBlockSize; // 24.3
THeap heap("vdisk", chunkSize, appendBlockSize, minHugeBlobInBytes, oldMinHugeBlobSizeInBytes, milestoneBlobInBytes,
maxBlobInBytes, overhead, 0);
heap.ParseFromString(serialized);

THugeSlot slot;
ui32 slotSize;
UNIT_ASSERT(heap.Allocate(97537 - 1, &slot, &slotSize));
Cerr << slot.GetChunkId() << Endl;
UNIT_ASSERT(heap.Allocate(113792 - 1, &slot, &slotSize));
Cerr << slot.GetChunkId() << Endl;
UNIT_ASSERT(heap.Allocate(170688 - 1, &slot, &slotSize));
Cerr << slot.GetChunkId() << Endl;

serialized = heap.Serialize();
}
}
}

} // NKikimr

0 comments on commit ba1fb68

Please sign in to comment.