Skip to content

Commit

Permalink
ReadOnly pdisk (#12256)
Browse files Browse the repository at this point in the history
  • Loading branch information
SammyVimes authored Dec 16, 2024
1 parent 35acee0 commit 9d8fcc7
Show file tree
Hide file tree
Showing 34 changed files with 804 additions and 63 deletions.
3 changes: 2 additions & 1 deletion ydb/core/blobstorage/nodewarden/node_warden_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,8 @@ namespace NKikimr::NStorage {
std::map<TVSlotId, TVDiskRecord> LocalVDisks;
THashMap<TActorId, TVSlotId> VDiskIdByActor;
std::map<TVSlotId, ui64> SlayInFlight;
std::set<ui32> PDiskRestartInFlight;
// PDiskId -> is another restart required after the current restart.
std::unordered_map<ui32, bool> PDiskRestartInFlight;
TIntrusiveList<TVDiskRecord, TUnreportedMetricTag> VDisksWithUnreportedMetrics;

void DestroyLocalVDisk(TVDiskRecord& vdisk);
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/blobstorage/nodewarden/node_warden_mon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ void TNodeWarden::RenderWholePage(IOutputStream& out) {
}
if (!PDiskRestartInFlight.empty()) {
DIV() {
out << "PDiskRestartInFlight# " << FormatList(PDiskRestartInFlight);
out << "PDiskRestartInFlight# [";
for (const auto& item : PDiskRestartInFlight) {
out << "pdiskId:" << item.first << " -> needsAnotherRestart: " << item.second << ", ";
}
out << "]";
}
}
if (!PDisksWaitingToStart.empty()) {
Expand Down
36 changes: 32 additions & 4 deletions ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ namespace NKikimr::NStorage {
}
pdiskConfig->MaxCommonLogChunks = deviceType == NPDisk::DEVICE_TYPE_ROT ? MaxCommonLogChunksHDD : MaxCommonLogChunksSSD;

if (pdisk.HasReadOnly()) {
pdiskConfig->ReadOnly = pdisk.GetReadOnly();
}

// Path scheme: "SectorMap:unique_name[:3000]"
// where '3000' is device size of in GiB.
if (path.Contains(":")) {
Expand Down Expand Up @@ -281,13 +285,25 @@ namespace NKikimr::NStorage {
}

void TNodeWarden::OnPDiskRestartFinished(ui32 pdiskId, NKikimrProto::EReplyStatus status) {
if (PDiskRestartInFlight.erase(pdiskId) == 0) {
auto it = PDiskRestartInFlight.find(pdiskId);
if (it == PDiskRestartInFlight.end()) {
// There was no restart in progress.
return;
}

bool requiresAnotherRestart = it->second;

PDiskRestartInFlight.erase(it);

const TPDiskKey pdiskKey(LocalNodeId, pdiskId);

if (requiresAnotherRestart) {
auto it = LocalPDisks.find(pdiskKey);
auto pdisk = it->second.Record;
DoRestartLocalPDisk(pdisk);
return;
}

const TVSlotId from(pdiskKey.NodeId, pdiskKey.PDiskId, 0);
const TVSlotId to(pdiskKey.NodeId, pdiskKey.PDiskId, Max<ui32>());

Expand Down Expand Up @@ -330,11 +346,12 @@ namespace NKikimr::NStorage {

STLOG(PRI_NOTICE, BS_NODE, NW75, "DoRestartLocalPDisk", (PDiskId, pdiskId));

const auto [_, inserted] = PDiskRestartInFlight.emplace(pdiskId);
const auto [restartIt, inserted] = PDiskRestartInFlight.try_emplace(pdiskId, false);

if (!inserted) {
STLOG(PRI_NOTICE, BS_NODE, NW76, "Restart already in progress", (PDiskId, pdiskId));
// Restart is already in progress.
// Restart is already in progress, but we will need to make a new restart, as the configuration changed.
restartIt->second = true;
return;
}

Expand Down Expand Up @@ -381,12 +398,23 @@ namespace NKikimr::NStorage {
continue;
}

const NKikimrBlobStorage::EEntityStatus entityStatus = pdisk.HasEntityStatus()
NKikimrBlobStorage::EEntityStatus entityStatus = pdisk.HasEntityStatus()
? pdisk.GetEntityStatus()
: NKikimrBlobStorage::INITIAL;

const TPDiskKey key(pdisk);

if (pdisk.HasReadOnly()) {
if (auto it = LocalPDisks.find({pdisk.GetNodeID(), pdisk.GetPDiskID()}); it != LocalPDisks.end()) {
auto& record = it->second;

if (!record.Record.HasReadOnly() || record.Record.GetReadOnly() != pdisk.GetReadOnly()) {
// Changing read-only flag requires restart.
entityStatus = NKikimrBlobStorage::RESTART;
}
}
}

switch (entityStatus) {
case NKikimrBlobStorage::RESTART:
if (auto it = LocalPDisks.find({pdisk.GetNodeID(), pdisk.GetPDiskID()}); it != LocalPDisks.end()) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk.h
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ struct TEvReadLogResult : public TEventLocal<TEvReadLogResult, TEvBlobStorage::E
TLogPosition Position;
TLogPosition NextPosition;
bool IsEndOfLog;
ui32 LastGoodChunkIdx = 0;
ui64 LastGoodSectorIdx = 0;

TStatusFlags StatusFlags;
TString ErrorReason;
TOwner Owner;
Expand Down
17 changes: 16 additions & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,21 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
auto [actor, actorSystem, pDiskActor, metadata] = *params;
delete params;

TPDiskConfig *cfg = actor->Cfg.Get();

if (cfg->ReadOnly) {
TString readOnlyError = "PDisk is in read-only mode";
STLOGX(*actorSystem, PRI_ERROR, BS_PDISK, BSP01, "Formatting error", (What, readOnlyError));
actorSystem->Send(pDiskActor, new TEvPDiskFormattingFinished(false, readOnlyError));
return nullptr;
}

NPDisk::TKey chunkKey;
NPDisk::TKey logKey;
NPDisk::TKey sysLogKey;
SafeEntropyPoolRead(&chunkKey, sizeof(NKikimr::NPDisk::TKey));
SafeEntropyPoolRead(&logKey, sizeof(NKikimr::NPDisk::TKey));
SafeEntropyPoolRead(&sysLogKey, sizeof(NKikimr::NPDisk::TKey));
TPDiskConfig *cfg = actor->Cfg.Get();

try {
try {
Expand Down Expand Up @@ -464,6 +472,13 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(new ::NMonitoring::TDynamicCounters);
std::shared_ptr<TPDiskCtx> pCtx = std::get<3>(*params);

if (cfg->ReadOnly) {
TString readOnlyError = "PDisk is in read-only mode";
STLOGX(*pCtx->ActorSystem, PRI_ERROR, BS_PDISK, BSP01, "Formatting error", (What, readOnlyError));
pCtx->ActorSystem->Send(pCtx->PDiskActor, new TEvPDiskFormattingFinished(false, readOnlyError));
return nullptr;
}

THolder<NPDisk::TPDisk> pDisk(new NPDisk::TPDisk(pCtx, cfg, counters));

pDisk->Initialize();
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ class TPDisk;
IBlockDevice* CreateRealBlockDevice(const TString &path, TPDiskMon &mon,
ui64 reorderingCycles, ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags,
ui32 maxQueuedCompletionActions, ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap,
TPDisk * const pdisk = nullptr);
TPDisk * const pdisk = nullptr, bool readOnly = false);
IBlockDevice* CreateRealBlockDeviceWithDefaults(const TString &path, TPDiskMon &mon, TDeviceMode::TFlags flags,
TIntrusivePtr<TSectorMap> sectorMap, TActorSystem *actorSystem, TPDisk * const pdisk = nullptr);
TIntrusivePtr<TSectorMap> sectorMap, TActorSystem *actorSystem, TPDisk * const pdisk = nullptr, bool readOnly = false);

} // NPDisk
} // NKikimr
19 changes: 12 additions & 7 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ class TRealBlockDevice : public IBlockDevice {
TFlightControl FlightControl;
TAtomicBlockCounter QuitCounter;
TString LastWarning;
bool ReadOnly;
TDeque<IAsyncIoOperation*> Trash;
TMutex TrashMutex;

Expand All @@ -843,7 +844,7 @@ class TRealBlockDevice : public IBlockDevice {
public:
TRealBlockDevice(const TString &path, TPDiskMon &mon, ui64 reorderingCycles,
ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags, ui32 maxQueuedCompletionActions,
ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap)
ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, bool readOnly)
: Mon(mon)
, Path(path)
, CompletionThreads(nullptr)
Expand All @@ -864,6 +865,7 @@ class TRealBlockDevice : public IBlockDevice {
, DeviceInFlight(FastClp2(deviceInFlight))
, FlightControl(CountTrailingZeroBits(DeviceInFlight))
, LastWarning(IsPowerOf2(deviceInFlight) ? "" : "Device inflight must be a power of 2")
, ReadOnly(readOnly)
{
if (sectorMap) {
DriveData = TDriveData();
Expand Down Expand Up @@ -1038,6 +1040,7 @@ class TRealBlockDevice : public IBlockDevice {
}

void TrimSync(ui32 size, ui64 offset) override {
Y_ABORT_UNLESS(!ReadOnly);
IAsyncIoOperation* op = IoContext->CreateAsyncIoOperation(nullptr, {}, nullptr);
IoContext->PreparePTrim(op, size, offset);
IsTrimEnabled = IoContext->DoTrim(op);
Expand All @@ -1064,6 +1067,7 @@ class TRealBlockDevice : public IBlockDevice {
void PwriteAsync(const void *data, ui64 size, ui64 offset, TCompletionAction *completionAction, TReqId reqId,
NWilson::TTraceId *traceId) override {
Y_ABORT_UNLESS(completionAction);
Y_ABORT_UNLESS(!ReadOnly);
if (!IsInitialized) {
completionAction->Release(PCtx->ActorSystem);
return;
Expand All @@ -1080,6 +1084,7 @@ class TRealBlockDevice : public IBlockDevice {

void FlushAsync(TCompletionAction *completionAction, TReqId reqId) override {
Y_ABORT_UNLESS(completionAction);
Y_ABORT_UNLESS(!ReadOnly);
if (!IsInitialized) {
completionAction->Release(PCtx->ActorSystem);
return;
Expand Down Expand Up @@ -1348,9 +1353,9 @@ class TCachedBlockDevice : public TRealBlockDevice {
public:
TCachedBlockDevice(const TString &path, TPDiskMon &mon, ui64 reorderingCycles,
ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags, ui32 maxQueuedCompletionActions,
ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk)
ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk, bool readOnly)
: TRealBlockDevice(path, mon, reorderingCycles, seekCostNs, deviceInFlight, flags,
maxQueuedCompletionActions, completionThreadsCount, sectorMap)
maxQueuedCompletionActions, completionThreadsCount, sectorMap, readOnly)
, ReadsInFly(0)
, PDisk(pdisk)
{}
Expand Down Expand Up @@ -1486,14 +1491,14 @@ class TCachedBlockDevice : public TRealBlockDevice {

IBlockDevice* CreateRealBlockDevice(const TString &path, TPDiskMon &mon, ui64 reorderingCycles,
ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags, ui32 maxQueuedCompletionActions,
ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk) {
ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk, bool readOnly) {
return new TCachedBlockDevice(path, mon, reorderingCycles, seekCostNs, deviceInFlight, flags,
maxQueuedCompletionActions, completionThreadsCount, sectorMap, pdisk);
maxQueuedCompletionActions, completionThreadsCount, sectorMap, pdisk, readOnly);
}

IBlockDevice* CreateRealBlockDeviceWithDefaults(const TString &path, TPDiskMon &mon, TDeviceMode::TFlags flags,
TIntrusivePtr<TSectorMap> sectorMap, TActorSystem *actorSystem, TPDisk * const pdisk) {
IBlockDevice *device = CreateRealBlockDevice(path, mon, 0, 0, 4, flags, 8, 1, sectorMap, pdisk);
TIntrusivePtr<TSectorMap> sectorMap, TActorSystem *actorSystem, TPDisk * const pdisk, bool readOnly) {
IBlockDevice *device = CreateRealBlockDevice(path, mon, 0, 0, 4, flags, 8, 1, sectorMap, pdisk, readOnly);
device->Initialize(std::make_shared<TPDiskCtx>(actorSystem));
return device;
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ struct TPDiskConfig : public TThrRefBase {

bool MetadataOnly = false;

bool ReadOnly = false;

TPDiskConfig(ui64 pDiskGuid, ui32 pdiskId, ui64 pDiskCategory)
: TPDiskConfig({}, pDiskGuid, pdiskId, pDiskCategory)
{}
Expand Down
100 changes: 95 additions & 5 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ TPDisk::TPDisk(std::shared_ptr<TPDiskCtx> pCtx, const TIntrusivePtr<TPDiskConfig
, BlockDevice(CreateRealBlockDevice(cfg->GetDevicePath(), Mon,
HPCyclesMs(ReorderingMs), DriveModel.SeekTimeNs(), cfg->DeviceInFlight,
TDeviceMode::LockFile | (cfg->UseSpdkNvmeDriver ? TDeviceMode::UseSpdk : 0),
cfg->MaxQueuedCompletionActions, cfg->CompletionThreadsCount, cfg->SectorMap, this))
cfg->MaxQueuedCompletionActions, cfg->CompletionThreadsCount, cfg->SectorMap, this, cfg->ReadOnly))
, Cfg(cfg)
, CreationTime(TInstant::Now())
, ExpectedSlotCount(cfg->ExpectedSlotCount)
Expand Down Expand Up @@ -1725,14 +1725,14 @@ void TPDisk::WriteDiskFormat(ui64 diskSizeBytes, ui32 sectorSizeBytes, ui32 user
// Owner initialization
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

void TPDisk::ReplyErrorYardInitResult(TYardInit &evYardInit, const TString &str) {
void TPDisk::ReplyErrorYardInitResult(TYardInit &evYardInit, const TString &str, NKikimrProto::EReplyStatus status) {
TStringStream error;
error << "PDiskId# " << PCtx->PDiskId << " YardInit error for VDiskId# " << evYardInit.VDisk.ToStringWOGeneration()
<< " reason# " << str;
P_LOG(PRI_ERROR, BPD01, error.Str());
ui64 writeBlockSize = ForsetiOpPieceSizeCached;
ui64 readBlockSize = ForsetiOpPieceSizeCached;
PCtx->ActorSystem->Send(evYardInit.Sender, new NPDisk::TEvYardInitResult(NKikimrProto::ERROR,
PCtx->ActorSystem->Send(evYardInit.Sender, new NPDisk::TEvYardInitResult(status,
DriveModel.SeekTimeNs() / 1000ull, DriveModel.Speed(TDriveModel::OP_TYPE_READ),
DriveModel.Speed(TDriveModel::OP_TYPE_WRITE), readBlockSize, writeBlockSize,
DriveModel.BulkWriteBlockSize(),
Expand Down Expand Up @@ -1767,8 +1767,12 @@ bool TPDisk::YardInitForKnownVDisk(TYardInit &evYardInit, TOwner owner) {
ADD_RECORD_WITH_TIMESTAMP_TO_OPERATION_LOG(ownerData.OperationLog, "YardInitForKnownVDisk, OwnerId# " << owner
<< ", evYardInit# " << evYardInit.ToString());

TFirstUncommitted firstUncommitted = CommonLogger->FirstUncommitted.load();
ownerData.LogEndPosition = TOwnerData::TLogEndPosition(firstUncommitted.ChunkIdx, firstUncommitted.SectorIdx);
if (Cfg->ReadOnly) {
ownerData.LogEndPosition = TOwnerData::TLogEndPosition(LastInitialChunkIdx, LastInitialSectorIdx);
} else {
TFirstUncommitted firstUncommitted = CommonLogger->FirstUncommitted.load();
ownerData.LogEndPosition = TOwnerData::TLogEndPosition(firstUncommitted.ChunkIdx, firstUncommitted.SectorIdx);
}

ownerData.OwnerRound = evYardInit.OwnerRound;
TOwnerRound ownerRound = evYardInit.OwnerRound;
Expand Down Expand Up @@ -1892,6 +1896,11 @@ void TPDisk::YardInitFinish(TYardInit &evYardInit) {
return;
}

if (Cfg->ReadOnly) {
ReplyErrorYardInitResult(evYardInit, "PDisk is in ReadOnly mode. Marker# BPD47", NKikimrProto::CORRUPTED);
return;
}

// Make sure owner round never decreases
// Allocate quota for the owner
// TODO(cthulhu): don't allocate more owners than expected
Expand Down Expand Up @@ -3445,6 +3454,16 @@ void TPDisk::EnqueueAll() {

while (InputQueue.GetWaitingSize() > 0) {
TRequestBase* request = InputQueue.Pop();

if (Cfg->ReadOnly && HandleReadOnlyIfWrite(request)) {
LOG_DEBUG(*PCtx->ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " ReqId# %" PRIu64
" got write request in ReadOnly mode type# %" PRIu64,
(ui32)PCtx->PDiskId, (ui64)request->ReqId.Id, (ui32)request->GetType());

delete request;
return;
}

P_LOG(PRI_TRACE, BPD83, "EnqueueAll, pop from InputQueue", (requestType, TypeName(*request)), (alreadyProcessedReqs, processedReqs));
AtomicSub(InputQueueCost, request->Cost);
if (IsQueuePaused) {
Expand Down Expand Up @@ -3753,6 +3772,77 @@ void TPDisk::UpdateMinLogCostNs() {
}
}

// Handles write requests (only in read-only mode). Returns true, if request is a write request.
bool TPDisk::HandleReadOnlyIfWrite(TRequestBase *request) {
const TActorId& sender = request->Sender;
TString errorReason = "PDisk is in read-only mode";

switch (request->GetType()) {
// Reads and other operations that can be processed in read-only mode.
case ERequestType::RequestLogRead:
case ERequestType::RequestLogReadContinue:
case ERequestType::RequestLogReadResultProcess:
case ERequestType::RequestLogSectorRestore:
case ERequestType::RequestChunkRead:
case ERequestType::RequestChunkReadPiece:
case ERequestType::RequestYardInit:
case ERequestType::RequestCheckSpace:
case ERequestType::RequestHarakiri:
case ERequestType::RequestYardSlay:
case ERequestType::RequestYardControl:
case ERequestType::RequestWhiteboartReport:
case ERequestType::RequestHttpInfo:
case ERequestType::RequestStopDevice:
case ERequestType::RequestReadMetadata:
case ERequestType::RequestInitialReadMetadataResult:
case ERequestType::RequestUndelivered:
case ERequestType::RequestNop:
case ERequestType::RequestConfigureScheduler:
case ERequestType::RequestPushUnformattedMetadataSector:
case ERequestType::RequestContinueReadMetadata:
return false;

// Can't be processed in read-only mode.
case ERequestType::RequestLogWrite: {
TLogWrite &ev = *static_cast<TLogWrite*>(request);
NPDisk::TEvLogResult* result = new NPDisk::TEvLogResult(NKikimrProto::CORRUPTED, 0, errorReason);
result->Results.push_back(NPDisk::TEvLogResult::TRecord(ev.Lsn, ev.Cookie));
PCtx->ActorSystem->Send(sender, result);
ev.Replied = true;
return true;
}
case ERequestType::RequestChunkWrite: {
TChunkWrite &ev = *static_cast<TChunkWrite*>(request);
SendChunkWriteError(ev, errorReason, NKikimrProto::CORRUPTED);
return true;
}
case ERequestType::RequestChunkReserve:
PCtx->ActorSystem->Send(sender, new NPDisk::TEvChunkReserveResult(NKikimrProto::CORRUPTED, 0, errorReason));
return true;
case ERequestType::RequestChunkLock:
PCtx->ActorSystem->Send(sender, new NPDisk::TEvChunkLockResult(NKikimrProto::CORRUPTED, {}, 0, errorReason));
return true;
case ERequestType::RequestChunkUnlock:
PCtx->ActorSystem->Send(sender, new NPDisk::TEvChunkUnlockResult(NKikimrProto::CORRUPTED, 0, errorReason));
return true;
case ERequestType::RequestChunkForget:
PCtx->ActorSystem->Send(sender, new NPDisk::TEvChunkForgetResult(NKikimrProto::CORRUPTED, 0, errorReason));
return true;

case ERequestType::RequestWriteMetadata:
case ERequestType::RequestWriteMetadataResult:
case ERequestType::RequestTryTrimChunk:
case ERequestType::RequestReleaseChunks:
case ERequestType::RequestChunkWritePiece:
case ERequestType::RequestChunkTrim:
case ERequestType::RequestAskForCutLog:
case ERequestType::RequestCommitLogChunks:
case ERequestType::RequestLogCommitDone:
// These requests don't require response.
return true;
}
}

void TPDisk::AddCbs(ui32 ownerId, EGate gate, const char *gateName, ui64 minBudget) {
if (!ForsetiScheduler.GetCbs(ownerId, gate)) {
NSchLab::TCbs cbs;
Expand Down
Loading

0 comments on commit 9d8fcc7

Please sign in to comment.