Skip to content

Commit

Permalink
Implement configurable number of completion threads (#9715)
Browse files Browse the repository at this point in the history
  • Loading branch information
va-kuznecov authored Oct 4, 2024
1 parent f2805c6 commit e55564f
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 104 deletions.
3 changes: 2 additions & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class TPDisk;

IBlockDevice* CreateRealBlockDevice(const TString &path, TPDiskMon &mon,
ui64 reorderingCycles, ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags,
ui32 maxQueuedCompletionActions, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk = nullptr);
ui32 maxQueuedCompletionActions, ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap,
TPDisk * const pdisk = nullptr);
IBlockDevice* CreateRealBlockDeviceWithDefaults(const TString &path, TPDiskMon &mon, TDeviceMode::TFlags flags,
TIntrusivePtr<TSectorMap> sectorMap, TActorSystem *actorSystem, TPDisk * const pdisk = nullptr);

Expand Down
221 changes: 141 additions & 80 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,12 @@ Y_UNIT_TEST_SUITE(TBlockDeviceTest) {
THolder<NPDisk::TBufferPool> bufferPool(NPDisk::CreateBufferPool(buffSize, bufferPoolSize, false, {}));
ui64 inFlight = 128;
ui32 maxQueuedCompletionActions = bufferPoolSize / 2;
ui32 completionThreadsCount = 1;
ui64 diskSize = 32_GB;

TIntrusivePtr<NPDisk::TSectorMap> sectorMap = new NPDisk::TSectorMap(diskSize, NSectorMap::DM_NONE);
THolder<NPDisk::IBlockDevice> device(CreateRealBlockDevice("", *mon, 0, 0, inFlight, TDeviceMode::None,
maxQueuedCompletionActions, sectorMap));
maxQueuedCompletionActions, completionThreadsCount, sectorMap));
device->Initialize(std::make_shared<TPDiskCtx>(creator.GetActorSystem()));

TAtomic counter = 0;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct TCompletionAction {
// to BlockDevice from Exec() and it's more safe to use WhiteList to allow only
// LogWrite and ChunkWrite to be executed from GetThread
bool ShouldBeExecutedInCompletionThread = true;
bool CanBeExecutedInAdditionalCompletionThread = false;

mutable NLWTrace::TOrbit Orbit;
protected:
Expand Down
35 changes: 18 additions & 17 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,16 @@ TCompletionChunkReadPart::TCompletionChunkReadPart(TPDisk *pDisk, TIntrusivePtr<
, PayloadReadSize(payloadReadSize)
, CommonBufferOffset(commonBufferOffset)
, CumulativeCompletion(cumulativeCompletion)
, ChunkNonce(CumulativeCompletion->GetChunkNonce())
, Buffer(PDisk->BufferPool->Pop())
, IsTheLastPart(isTheLastPart)
, Span(std::move(span))
{
TCompletionAction::CanBeExecutedInAdditionalCompletionThread = true;

TBufferWithGaps *commonBuffer = CumulativeCompletion->GetCommonBuffer();
Destination = commonBuffer->RawDataPtr(CommonBufferOffset, PayloadReadSize);

if (!IsTheLastPart) {
CumulativeCompletion->AddPart();
}
Expand Down Expand Up @@ -166,8 +172,6 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
Read->Offset + CommonBufferOffset, PayloadReadSize, firstSector, lastSector, sectorOffset);
Y_ABORT_UNLESS(isOk);

TBufferWithGaps *commonBuffer = CumulativeCompletion->GetCommonBuffer();
ui8 *destination = commonBuffer->RawDataPtr(CommonBufferOffset, PayloadReadSize);

ui8* source = Buffer->Data();

Expand All @@ -183,8 +187,6 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
sectorOffset = 0;
}

ui64 chunkNonce = CumulativeCompletion->GetChunkNonce();

ui32 beginBadUserOffset = 0xffffffff;
ui32 endBadUserOffset = 0xffffffff;
ui32 userSectorSize = format.SectorPayloadSize();
Expand All @@ -193,7 +195,7 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {

TSectorRestorator restorator(false, 1, false,
format, PDisk->PCtx.get(), &PDisk->Mon, PDisk->BufferPool.Get());
ui64 lastNonce = Min((ui64)0, chunkNonce - 1);
ui64 lastNonce = Min((ui64)0, ChunkNonce - 1);
restorator.Restore(source, format.Offset(Read->ChunkIdx, sectorIdx), format.MagicDataChunk, lastNonce,
Read->Owner);

Expand All @@ -211,7 +213,7 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
<< " for owner# " << Read->Owner
<< " beginBadUserOffet# " << beginBadUserOffset << " endBadUserOffset# " << endBadUserOffset
<< " due to multiple sectors with incorrect hashes. Marker# BPC001");
commonBuffer->AddGap(beginBadUserOffset, endBadUserOffset);
CumulativeCompletion->AddGap(beginBadUserOffset, endBadUserOffset);
beginBadUserOffset = 0xffffffff;
endBadUserOffset = 0xffffffff;
}
Expand All @@ -221,35 +223,35 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {

// Decrypt data
if (beginBadUserOffset != 0xffffffff) {
memset(destination, 0, sectorPayloadSize);
memset(Destination, 0, sectorPayloadSize);
} else {
TDataSectorFooter *footer = (TDataSectorFooter*) (source + format.SectorSize - sizeof(TDataSectorFooter));
if (footer->Nonce != chunkNonce + sectorIdx) {
if (footer->Nonce != ChunkNonce + sectorIdx) {
ui32 userOffset = sectorIdx * userSectorSize;
LOG_INFO_S(*actorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDisk->PCtx->PDiskId
<< " ReqId# " << Read->ReqId
<< " Can't read chunk chunkIdx# " << Read->ChunkIdx
<< " for owner# " << Read->Owner
<< " nonce mismatch: expected# " << (ui64)(chunkNonce + sectorIdx)
<< " nonce mismatch: expected# " << (ui64)(ChunkNonce + sectorIdx)
<< ", on-disk# " << (ui64)footer->Nonce
<< " for userOffset# " << userOffset
<< " ! Marker# BPC002");
if (beginBadUserOffset == 0xffffffff) {
beginBadUserOffset = userOffset;
}
endBadUserOffset = beginUserOffset + userSectorSize;
memset(destination, 0, sectorPayloadSize);
memset(Destination, 0, sectorPayloadSize);
} else {
cypher.StartMessage(footer->Nonce);
if (sectorOffset > 0 || intptr_t(destination) % 32) {
if (sectorOffset > 0 || intptr_t(Destination) % 32) {
cypher.InplaceEncrypt(source, sectorOffset + sectorPayloadSize);
if (CommonBufferOffset == 0 || !IsTheLastPart) {
memcpy(destination, source + sectorOffset, sectorPayloadSize);
memcpy(Destination, source + sectorOffset, sectorPayloadSize);
} else {
memcpy(destination, source, sectorPayloadSize);
memcpy(Destination, source, sectorPayloadSize);
}
} else {
cypher.Encrypt(destination, source, sectorPayloadSize);
cypher.Encrypt(Destination, source, sectorPayloadSize);
}
if (CanarySize > 0) {
ui32 canaryPosition = sectorOffset + sectorPayloadSize;
Expand All @@ -259,7 +261,7 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
}
}
}
destination += sectorPayloadSize;
Destination += sectorPayloadSize;
source += format.SectorSize;
PayloadReadSize -= sectorPayloadSize;
sectorPayloadSize = Min(format.SectorPayloadSize(), PayloadReadSize);
Expand All @@ -273,7 +275,7 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
<< " for owner# " << Read->Owner
<< " beginBadUserOffet# " << beginBadUserOffset << " endBadUserOffset# " << endBadUserOffset
<< " due to multiple sectors with incorrect hashes/nonces. Marker# BPC003");
commonBuffer->AddGap(beginBadUserOffset, endBadUserOffset);
CumulativeCompletion->AddGap(beginBadUserOffset, endBadUserOffset);
beginBadUserOffset = 0xffffffff;
endBadUserOffset = 0xffffffff;
}
Expand Down Expand Up @@ -407,4 +409,3 @@ void TChunkTrimCompletion::Exec(TActorSystem *actorSystem) {

} // NPDisk
} // NKikimr

10 changes: 9 additions & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class TCompletionChunkRead : public TCompletionAction {
TPDisk *PDisk;
TIntrusivePtr<TChunkRead> Read;
TBufferWithGaps CommonBuffer;
TMutex CommonBufferMutex; // used to protect CommonBuffer when gaps are being add
TAtomic PartsPending;
TAtomic Deletes;
std::function<void()> OnDestroy;
Expand Down Expand Up @@ -206,6 +207,11 @@ class TCompletionChunkRead : public TCompletionAction {
return &CommonBuffer;
}

void AddGap(ui32 start, ui32 end) {
TGuard<TMutex> g(CommonBufferMutex);
CommonBuffer.AddGap(start, end);
}

ui64 GetChunkNonce() {
return ChunkNonce;
}
Expand All @@ -228,12 +234,14 @@ class TCompletionChunkReadPart : public TCompletionAction {
ui64 PayloadReadSize;
ui64 CommonBufferOffset;
TCompletionChunkRead *CumulativeCompletion;
ui64 ChunkNonce;
ui8 *Destination = nullptr;
TBuffer::TPtr Buffer;
bool IsTheLastPart;
NWilson::TSpan Span;
public:
TCompletionChunkReadPart(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, ui64 rawReadSize, ui64 payloadReadSize,
ui64 commonBufferOffset, TCompletionChunkRead *cumulativeCompletion, bool isTheLastPart,
ui64 commonBufferOffset, TCompletionChunkRead *cumulativeCompletion, bool isTheLastPart,
NWilson::TSpan&& span);


Expand Down
8 changes: 7 additions & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ struct TPDiskConfig : public TThrRefBase {

NKikimrBlobStorage::TPDiskSpaceColor::E SpaceColorBorder = NKikimrBlobStorage::TPDiskSpaceColor::GREEN;

ui32 CompletionThreadsCount = 1;

bool MetadataOnly = false;

TPDiskConfig(ui64 pDiskGuid, ui32 pdiskId, ui64 pDiskCategory)
Expand Down Expand Up @@ -310,6 +312,7 @@ struct TPDiskConfig : public TThrRefBase {
str << " YellowLogChunksMultiplier# " << YellowLogChunksMultiplier << x;
str << " MaxMetadataMegabytes# " << MaxMetadataMegabytes << x;
str << " SpaceColorBorder# " << SpaceColorBorder << x;
str << " CompletionThreadsCount# " << CompletionThreadsCount << x;
str << "}";
return str.Str();
}
Expand Down Expand Up @@ -394,8 +397,11 @@ struct TPDiskConfig : public TThrRefBase {
limit = Max<ui32>(13, limit);
ChunkBaseLimit = limit;
}

if (cfg->HasCompletionThreadsCount()) {
CompletionThreadsCount = cfg->GetCompletionThreadsCount();
}
}
};

} // NKikimr

Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ TDriveEstimator::TDriveEstimator(const TString filename)
, ActorSystemCreator(new TActorSystemCreator)
, ActorSystem(ActorSystemCreator->GetActorSystem())
, QueueDepth(4)
, Device(CreateRealBlockDevice(filename, PDiskMon, 50, 0, QueueDepth, TDeviceMode::LockFile, 128, nullptr, nullptr))
, Device(CreateRealBlockDevice(filename, PDiskMon, 50, 0, QueueDepth, TDeviceMode::LockFile, 128, 1, nullptr, nullptr))
, BufferPool(CreateBufferPool(BufferSize, 1, false, {}))
, Buffer(BufferPool->Pop())
{
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ TPDisk::TPDisk(std::shared_ptr<TPDiskCtx> pCtx, const TIntrusivePtr<TPDiskConfig
, BlockDevice(CreateRealBlockDevice(cfg->GetDevicePath(), Mon,
HPCyclesMs(ReorderingMs), DriveModel.SeekTimeNs(), cfg->DeviceInFlight,
TDeviceMode::LockFile | (cfg->UseSpdkNvmeDriver ? TDeviceMode::UseSpdk : 0),
cfg->MaxQueuedCompletionActions, cfg->SectorMap, this))
cfg->MaxQueuedCompletionActions, cfg->CompletionThreadsCount, cfg->SectorMap, this))
, Cfg(cfg)
, CreationTime(TInstant::Now())
, ExpectedSlotCount(cfg->ExpectedSlotCount)
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/protos/blobstorage_pdisk_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,6 @@ message TPDiskConfig {
optional uint64 ExpectedSlotCount = 2001; // Number of slots to calculate per-vdisk disk space limit.

optional uint32 ChunkBaseLimit = 2002; // Free chunk permille that triggers Cyan color (e.g. 100 is 10%). Between 130 (default) and 13.
};

optional uint32 CompletionThreadsCount = 2003;
};

0 comments on commit e55564f

Please sign in to comment.