Skip to content

Commit

Permalink
Test a few small completion threads count (#10253)
Browse files Browse the repository at this point in the history
  • Loading branch information
va-kuznecov authored Oct 9, 2024
1 parent 4c77083 commit 1120c1a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 43 deletions.
85 changes: 43 additions & 42 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,50 +236,51 @@ Y_UNIT_TEST_SUITE(TBlockDeviceTest) {
TActorSystemCreator creator;
auto start = TMonotonic::Now();
while ((TMonotonic::Now() - start).Seconds() < 5) {
const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;
THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr));

ui32 buffSize = 64_KB;
auto randomData = PrepareData(buffSize);
ui32 bufferPoolSize = 512;
THolder<NPDisk::TBufferPool> bufferPool(NPDisk::CreateBufferPool(buffSize, bufferPoolSize, false, {}));
ui64 inFlight = 128;
ui32 maxQueuedCompletionActions = bufferPoolSize / 2;
ui32 completionThreadsCount = 1;
ui64 diskSize = 32_GB;

TIntrusivePtr<NPDisk::TSectorMap> sectorMap = new NPDisk::TSectorMap(diskSize, NSectorMap::DM_NONE);
THolder<NPDisk::IBlockDevice> device(CreateRealBlockDevice("", *mon, 0, 0, inFlight, TDeviceMode::None,
maxQueuedCompletionActions, completionThreadsCount, sectorMap));
device->Initialize(std::make_shared<TPDiskCtx>(creator.GetActorSystem()));

TAtomic counter = 0;
const i64 totalRequests = 500;
for (i64 i = 0; i < totalRequests; i++) {
auto *completion = new TCompletionWorkerWithCounter(counter, TDuration::MicroSeconds(100));
NPDisk::TBuffer::TPtr buffer(bufferPool->Pop());
buffer->FlushAction = completion;
auto* data = buffer->Data();
switch (RandomNumber<ui32>(3)) {
case 0:
device->PreadAsync(data, 32_KB, 0, buffer.Release(), TReqId(), nullptr);
break;
case 1:
memcpy(data, randomData.data(), 32_KB);
device->PwriteAsync(data, 32_KB, 0, buffer.Release(), TReqId(), nullptr);
break;
case 2:
device->FlushAsync(completion, TReqId());
buffer->FlushAction = nullptr;
break;
default:
break;
for (auto completionThreadsCount : {0, 1, 2, 3}) {
const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;
THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr));

ui32 buffSize = 64_KB;
auto randomData = PrepareData(buffSize);
ui32 bufferPoolSize = 512;
THolder<NPDisk::TBufferPool> bufferPool(NPDisk::CreateBufferPool(buffSize, bufferPoolSize, false, {}));
ui64 inFlight = 128;
ui32 maxQueuedCompletionActions = bufferPoolSize / 2;
ui64 diskSize = 32_GB;

TIntrusivePtr<NPDisk::TSectorMap> sectorMap = new NPDisk::TSectorMap(diskSize, NSectorMap::DM_NONE);
THolder<NPDisk::IBlockDevice> device(CreateRealBlockDevice("", *mon, 0, 0, inFlight, TDeviceMode::None,
maxQueuedCompletionActions, completionThreadsCount, sectorMap));
device->Initialize(std::make_shared<TPDiskCtx>(creator.GetActorSystem()));

TAtomic counter = 0;
const i64 totalRequests = 500;
for (i64 i = 0; i < totalRequests; i++) {
auto *completion = new TCompletionWorkerWithCounter(counter, TDuration::MicroSeconds(100));
NPDisk::TBuffer::TPtr buffer(bufferPool->Pop());
buffer->FlushAction = completion;
auto* data = buffer->Data();
switch (RandomNumber<ui32>(3)) {
case 0:
device->PreadAsync(data, 32_KB, 0, buffer.Release(), TReqId(), nullptr);
break;
case 1:
memcpy(data, randomData.data(), 32_KB);
device->PwriteAsync(data, 32_KB, 0, buffer.Release(), TReqId(), nullptr);
break;
case 2:
device->FlushAsync(completion, TReqId());
buffer->FlushAction = nullptr;
break;
default:
break;
}
}
}

Ctest << AtomicGet(counter) << Endl;
device.Destroy();
UNIT_ASSERT(AtomicGet(counter) == totalRequests);
Ctest << AtomicGet(counter) << Endl;
device.Destroy();
UNIT_ASSERT(AtomicGet(counter) == totalRequests);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

TPDisk::TPDisk(std::shared_ptr<TPDiskCtx> pCtx, const TIntrusivePtr<TPDiskConfig> cfg, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters)
: PCtx(std::move(pCtx)) //std::make_shared<TPDiskCtx>())
: PCtx(std::move(pCtx))
, Mon(counters, cfg->PDiskId, cfg.Get())
, DriveModel(cfg->DriveModelSeekTimeNs,
cfg->DriveModelSpeedBps,
Expand Down

0 comments on commit 1120c1a

Please sign in to comment.