Skip to content

Commit

Permalink
Add GetBlock request
Browse files Browse the repository at this point in the history
  • Loading branch information
mregrock committed Dec 10, 2024
1 parent 309e2b0 commit 21340ae
Show file tree
Hide file tree
Showing 15 changed files with 282 additions and 0 deletions.
12 changes: 12 additions & 0 deletions ydb/core/base/blobstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ std::unique_ptr<TEvBlobStorage::TEvBlockResult> TEvBlobStorage::TEvBlock::MakeEr
return res;
}

void TEvBlobStorage::TEvGetBlock::ToSpan(NWilson::TSpan& span) const {
span
.Attribute("TabletId", ::ToString(TabletId));
}

std::unique_ptr<TEvBlobStorage::TEvGetBlockResult> TEvBlobStorage::TEvGetBlock::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvGetBlockResult>(status, TabletId, 0);
res->ErrorReason = errorReason;
return res;
}

void TEvBlobStorage::TEvPatch::ToSpan(NWilson::TSpan& span) const {
span
.Attribute("OriginalGroupId", OriginalGroupId)
Expand Down
65 changes: 65 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ struct TEvBlobStorage {
EvAssimilate,

EvGetQueuesInfo, // for debugging purposes
EvGetBlock,

//
EvPutResult = EvPut + 512, /// 268 632 576
Expand All @@ -509,6 +510,7 @@ struct TEvBlobStorage {
EvAssimilateResult,

EvQueuesInfo, // for debugging purposes
EvGetBlockResult,

// proxy <-> vdisk interface
EvVPut = EvPut + 2 * 512, /// 268 633 088
Expand Down Expand Up @@ -915,6 +917,7 @@ struct TEvBlobStorage {

struct TEvPutResult;
struct TEvGetResult;
struct TEvGetBlockResult;
struct TEvBlockResult;
struct TEvDiscoverResult;
struct TEvRangeResult;
Expand Down Expand Up @@ -1330,6 +1333,68 @@ struct TEvBlobStorage {
}
};

struct TEvGetBlock : public TEventLocal<TEvGetBlock, EvGetBlock> {
const ui64 TabletId;
const TInstant Deadline;
ui32 RestartCounter = 0;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvGetBlock(ui64 tabletId, TInstant deadline)
: TabletId(tabletId)
, Deadline(deadline)
{}

TString Print(bool /*isFull*/) const {
TStringStream str;
str << "TEvGetBlock {TabletId# " << TabletId
<< " Deadline# " << Deadline
<< "}";
return str.Str();
}

TString ToString() const {
return Print(false);
}

ui32 CalculateSize() const {
return sizeof(*this);
}

void ToSpan(NWilson::TSpan& span) const;

std::unique_ptr<TEvGetBlockResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
TGroupId groupId);
};

struct TEvGetBlockResult : public TEventLocal<TEvGetBlockResult, EvGetBlockResult> {
NKikimrProto::EReplyStatus Status;
ui64 TabletId;
ui32 BlockedGeneration;
TString ErrorReason;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvGetBlockResult(NKikimrProto::EReplyStatus status, ui64 tabletId, ui32 blockedGeneration)
: Status(status)
, TabletId(tabletId)
, BlockedGeneration(blockedGeneration)
{}

TString Print(bool /*isFull*/) const {
TStringStream str;
str << "TEvGetBlockResult {Status# " << NKikimrProto::EReplyStatus_Name(Status).data();
str << " TabletId# " << TabletId << " BlockedGeneration# " << BlockedGeneration;
if (ErrorReason.size()) {
str << " ErrorReason# \"" << ErrorReason << "\"";
}
str << "}";
return str.Str();
}

TString ToString() const {
return Print(false);
}
};

struct TEvBlock : public TEventLocal<TEvBlock, EvBlock> {
const ui64 TabletId;
const ui32 Generation;
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ NActors::NLog::EPriority PriorityForStatusInbound(NKikimrProto::EReplyStatus sta
XX(TEvBlobStorage::TEvPut) \
XX(TEvBlobStorage::TEvGet) \
XX(TEvBlobStorage::TEvBlock) \
XX(TEvBlobStorage::TEvGetBlock) \
XX(TEvBlobStorage::TEvDiscover) \
XX(TEvBlobStorage::TEvRange) \
XX(TEvBlobStorage::TEvCollectGarbage) \
Expand Down Expand Up @@ -486,6 +487,16 @@ struct TBlobStorageGroupBlockParameters {
};
IActor* CreateBlobStorageGroupBlockRequest(TBlobStorageGroupBlockParameters params);

struct TBlobStorageGroupGetBlockParameters {
TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvGetBlock> Common;
TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
.LogComponent = NKikimrServices::BS_PROXY_GETBLOCK,
.Name = "DSProxy.GetBlock",
.Activity = NKikimrServices::TActivity::BS_GROUP_GETBLOCK,
};
};
IActor* CreateBlobStorageGroupGetBlockRequest(TBlobStorageGroupGetBlockParameters params);

struct TBlobStorageGroupStatusParameters {
TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvStatus> Common;
TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
Expand Down
128 changes: 128 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_block.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#include "dsproxy.h"
#include "dsproxy_mon.h"
#include "dsproxy_quorum_tracker.h"
#include <ydb/core/blobstorage/base/blobstorage_events.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>

namespace NKikimr {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// GET BLOCK request
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

class TBlobStorageGroupGetBlockRequest : public TBlobStorageGroupRequestActor {
const ui64 TabletId;
ui64 Generation;
const TInstant Deadline;
ui64 Requests = 0;
ui64 Responses = 0;
TGroupQuorumTracker QuorumTracker;

void Handle(TEvBlobStorage::TEvVGetBlockResult::TPtr &ev) {
ProcessReplyFromQueue(ev->Get());
const NKikimrBlobStorage::TEvVGetBlockResult &record = ev->Get()->Record;
Y_ABORT_UNLESS(record.HasStatus());
NKikimrProto::EReplyStatus status = record.GetStatus();
Y_ABORT_UNLESS(record.HasVDiskID());
const TVDiskID vdisk = VDiskIDFromVDiskID(record.GetVDiskID());

DSP_LOG_LOG_S(PriorityForStatusInbound(status), "DSPGB01", "Handle TEvVGetBlockResult"
<< " status# " << NKikimrProto::EReplyStatus_Name(status)
<< " From# " << vdisk.ToString()
<< " NodeId# " << Info->GetActorId(vdisk).NodeId());

if (record.HasGeneration()) {
Generation = Max<ui32>(Generation, record.GetGeneration());
}
if (status == NKikimrProto::NODATA) {
status = NKikimrProto::OK; // assume OK for quorum tracker
}
++Responses;

switch (const NKikimrProto::EReplyStatus overallStatus = QuorumTracker.ProcessReply(vdisk, status)) {
case NKikimrProto::OK:
if (Responses == Requests) {
ReplyAndDie(NKikimrProto::OK);
}
break;

case NKikimrProto::ERROR:
ReplyAndDie(NKikimrProto::ERROR);
break;

default:
break;
}
}

void ReplyAndDie(NKikimrProto::EReplyStatus status) override {
auto result = std::make_unique<TEvBlobStorage::TEvGetBlockResult>(status, TabletId, Generation);
result->ErrorReason = ErrorReason;
DSP_LOG_DEBUG_S("DSPGB02", "ReplyAndDie Result# " << result->Print(false));
SendResponseAndDie(std::move(result));
}

void SendGetBlockRequest(const TVDiskID& vdiskId) {
DSP_LOG_DEBUG_S("DSPB03", "Sending TEvVBlock Tablet# " << TabletId
<< " Generation# " << Generation
<< " vdiskId# " << vdiskId
<< " node# " << Info->GetActorId(vdiskId).NodeId());

auto msg = std::make_unique<TEvBlobStorage::TEvVGetBlock>(TabletId, vdiskId, Deadline);
SendToQueue(std::move(msg), 0);
}

std::unique_ptr<IEventBase> RestartQuery(ui32 counter) override {
++*Mon->NodeMon->RestartGetBlock;
auto ev = std::make_unique<TEvBlobStorage::TEvGetBlock>(TabletId, Deadline);
ev->RestartCounter = counter;
return ev;
}
public:
::NMonitoring::TDynamicCounters::TCounterPtr& GetActiveCounter() const override {
return Mon->ActiveGetBlock;
}

ERequestType GetRequestType() const override {
return ERequestType::GetBlock;
}

TBlobStorageGroupGetBlockRequest(TBlobStorageGroupGetBlockParameters& params)
: TBlobStorageGroupRequestActor(params)
, TabletId(params.Common.Event->TabletId)
, Deadline(params.Common.Event->Deadline)
, QuorumTracker(Info.Get())
{}

void Bootstrap() override {
DSP_LOG_INFO_S("DSPGB04", "bootstrap"
<< " ActorId# " << SelfId()
<< " Group# " << Info->GroupID
<< " Deadline# " << Deadline
<< " RestartCounter# " << RestartCounter);
for (const auto& vdisk : Info->GetVDisks()) {
SendGetBlockRequest(Info->GetVDiskId(vdisk.OrderNumber));
++Requests;
}

Become(&TBlobStorageGroupGetBlockRequest::StateWait);

if (Requests == 0) {
ReplyAndDie(NKikimrProto::OK);
}
}

STATEFN(StateWait) {
if (ProcessEvent(ev)) {
return;
}
switch (ev->GetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvVGetBlockResult, Handle);
}
}
};

IActor* CreateBlobStorageGroupGetBlockRequest(TBlobStorageGroupGetBlockParameters params) {
return new TBlobStorageGroupGetBlockRequest(params);
}

} // NKikimr
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
Mon->EventGet->Inc();
} else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvBlock>) {
Mon->EventBlock->Inc();
} else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvGetBlock>) {
Mon->EventGetBlock->Inc();
} else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvDiscover>) {
Mon->EventDiscover->Inc();
} else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvRange>) {
Expand Down Expand Up @@ -258,6 +260,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
void PushRequest(IActor *actor, TInstant deadline);
void CheckDeadlines();
void HandleNormal(TEvBlobStorage::TEvGet::TPtr &ev);
void HandleNormal(TEvBlobStorage::TEvGetBlock::TPtr &ev);
void HandleNormal(TEvBlobStorage::TEvPut::TPtr &ev);
void HandleNormal(TEvBlobStorage::TEvBlock::TPtr &ev);
void HandleNormal(TEvBlobStorage::TEvPatch::TPtr &ev);
Expand Down Expand Up @@ -371,6 +374,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
#define HANDLE_EVENTS(HANDLER) \
hFunc(TEvBlobStorage::TEvPut, HANDLER); \
hFunc(TEvBlobStorage::TEvGet, HANDLER); \
hFunc(TEvBlobStorage::TEvGetBlock, HANDLER); \
hFunc(TEvBlobStorage::TEvBlock, HANDLER); \
hFunc(TEvBlobStorage::TEvDiscover, HANDLER); \
hFunc(TEvBlobStorage::TEvRange, HANDLER); \
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni
EventGet = EventGroup->GetCounter("EvGet", true);
EventGetResBytes = EventGroup->GetCounter("EvGetResBytes", true);
EventBlock = EventGroup->GetCounter("EvBlock", true);
EventGetBlock = EventGroup->GetCounter("EvGetBlock", true);
EventDiscover = EventGroup->GetCounter("EvDiscover", true);
EventRange = EventGroup->GetCounter("EvRange", true);
EventCollectGarbage = EventGroup->GetCounter("EvCollectGarbage", true);
Expand Down Expand Up @@ -68,6 +69,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni
ActiveGet = ActiveRequestsGroup->GetCounter("ActiveGet");
ActiveGetCapacity = ActiveRequestsGroup->GetCounter("ActiveGetCapacity");
ActiveBlock = ActiveRequestsGroup->GetCounter("ActiveBlock");
ActiveGetBlock = ActiveRequestsGroup->GetCounter("ActiveGetBlock");
ActiveDiscover = ActiveRequestsGroup->GetCounter("ActiveDiscover");
ActiveRange = ActiveRequestsGroup->GetCounter("ActiveRange");
ActiveCollectGarbage = ActiveRequestsGroup->GetCounter("ActiveCollectGarbage");
Expand Down Expand Up @@ -101,6 +103,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni
auto respStatGroup = NodeMon->Group->GetSubgroup("subsystem", "responseStatus");
RespStatPut.emplace(respStatGroup->GetSubgroup("request", "put"));
RespStatGet.emplace(respStatGroup->GetSubgroup("request", "get"));
RespStatGetBlock.emplace(respStatGroup->GetSubgroup("request", "getBlock"));
RespStatBlock.emplace(respStatGroup->GetSubgroup("request", "block"));
RespStatDiscover.emplace(respStatGroup->GetSubgroup("request", "discover"));
RespStatRange.emplace(respStatGroup->GetSubgroup("request", "range"));
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_mon.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ enum class ERequestType {
Status,
Assimilate,
Block,
GetBlock,
};

struct TRequestMonGroup {
Expand Down Expand Up @@ -196,6 +197,7 @@ class TBlobStorageGroupProxyMon : public TThrRefBase {
TRequestMonGroup StatusGroup;
TRequestMonGroup AssimilateGroup;
TRequestMonGroup BlockGroup;
TRequestMonGroup GetBlockGroup;

public:
TBlobStorageGroupProxyTimeStats TimeStats;
Expand All @@ -210,6 +212,7 @@ class TBlobStorageGroupProxyMon : public TThrRefBase {
// event counters
::NMonitoring::TDynamicCounters::TCounterPtr EventGet;
::NMonitoring::TDynamicCounters::TCounterPtr EventBlock;
::NMonitoring::TDynamicCounters::TCounterPtr EventGetBlock;
::NMonitoring::TDynamicCounters::TCounterPtr EventDiscover;
::NMonitoring::TDynamicCounters::TCounterPtr EventRange;
::NMonitoring::TDynamicCounters::TCounterPtr EventCollectGarbage;
Expand All @@ -230,6 +233,7 @@ class TBlobStorageGroupProxyMon : public TThrRefBase {
::NMonitoring::TDynamicCounters::TCounterPtr ActivePut;
::NMonitoring::TDynamicCounters::TCounterPtr ActivePutCapacity;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveGet;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveGetBlock;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveGetCapacity;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveBlock;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveDiscover;
Expand All @@ -244,6 +248,7 @@ class TBlobStorageGroupProxyMon : public TThrRefBase {

std::optional<TResponseStatusGroup> RespStatPut;
std::optional<TResponseStatusGroup> RespStatGet;
std::optional<TResponseStatusGroup> RespStatGetBlock;
std::optional<TResponseStatusGroup> RespStatBlock;
std::optional<TResponseStatusGroup> RespStatDiscover;
std::optional<TResponseStatusGroup> RespStatRange;
Expand All @@ -268,6 +273,8 @@ class TBlobStorageGroupProxyMon : public TThrRefBase {
case ERequestType::Status: return StatusGroup;
case ERequestType::Assimilate: return AssimilateGroup;
case ERequestType::Block: return BlockGroup;
case ERequestType::GetBlock: return GetBlockGroup;

}
Y_ABORT();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ TDsProxyNodeMon::TDsProxyNodeMon(TIntrusivePtr<::NMonitoring::TDynamicCounters>
auto group = Group->GetSubgroup("subsystem", "restart");
RestartPut = group->GetCounter("EvPut", true);
RestartGet = group->GetCounter("EvGet", true);
RestartGetBlock = group->GetCounter("EvGetBlock", true);
RestartPatch = group->GetCounter("EvPatch", true);
RestartBlock = group->GetCounter("EvBlock", true);
RestartDiscover = group->GetCounter("EvDiscover", true);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct TDsProxyNodeMon : public TThrRefBase {
// restart counters
::NMonitoring::TDynamicCounters::TCounterPtr RestartPut;
::NMonitoring::TDynamicCounters::TCounterPtr RestartGet;
::NMonitoring::TDynamicCounters::TCounterPtr RestartGetBlock;
::NMonitoring::TDynamicCounters::TCounterPtr RestartBlock;
::NMonitoring::TDynamicCounters::TCounterPtr RestartDiscover;
::NMonitoring::TDynamicCounters::TCounterPtr RestartRange;
Expand Down
Loading

0 comments on commit 21340ae

Please sign in to comment.