Skip to content

Commit

Permalink
Add GetBlock request (#12431)
Browse files Browse the repository at this point in the history
Тесты уже замьючены
  • Loading branch information
mregrock authored Dec 11, 2024
1 parent 410cfb6 commit 4ca0cb6
Show file tree
Hide file tree
Showing 29 changed files with 379 additions and 1 deletion.
12 changes: 12 additions & 0 deletions ydb/core/base/blobstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ std::unique_ptr<TEvBlobStorage::TEvBlockResult> TEvBlobStorage::TEvBlock::MakeEr
return res;
}

void TEvBlobStorage::TEvGetBlock::ToSpan(NWilson::TSpan& span) const {
span
.Attribute("TabletId", ::ToString(TabletId));
}

std::unique_ptr<TEvBlobStorage::TEvGetBlockResult> TEvBlobStorage::TEvGetBlock::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvGetBlockResult>(status, TabletId, 0);
res->ErrorReason = errorReason;
return res;
}

void TEvBlobStorage::TEvPatch::ToSpan(NWilson::TSpan& span) const {
span
.Attribute("OriginalGroupId", OriginalGroupId)
Expand Down
65 changes: 65 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ struct TEvBlobStorage {
EvAssimilate,

EvGetQueuesInfo, // for debugging purposes
EvGetBlock,

//
EvPutResult = EvPut + 512, /// 268 632 576
Expand All @@ -509,6 +510,7 @@ struct TEvBlobStorage {
EvAssimilateResult,

EvQueuesInfo, // for debugging purposes
EvGetBlockResult,

// proxy <-> vdisk interface
EvVPut = EvPut + 2 * 512, /// 268 633 088
Expand Down Expand Up @@ -915,6 +917,7 @@ struct TEvBlobStorage {

struct TEvPutResult;
struct TEvGetResult;
struct TEvGetBlockResult;
struct TEvBlockResult;
struct TEvDiscoverResult;
struct TEvRangeResult;
Expand Down Expand Up @@ -1330,6 +1333,68 @@ struct TEvBlobStorage {
}
};

struct TEvGetBlock : public TEventLocal<TEvGetBlock, EvGetBlock> {
const ui64 TabletId;
const TInstant Deadline;
ui32 RestartCounter = 0;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvGetBlock(ui64 tabletId, TInstant deadline)
: TabletId(tabletId)
, Deadline(deadline)
{}

TString Print(bool /*isFull*/) const {
TStringStream str;
str << "TEvGetBlock {TabletId# " << TabletId
<< " Deadline# " << Deadline
<< "}";
return str.Str();
}

TString ToString() const {
return Print(false);
}

ui32 CalculateSize() const {
return sizeof(*this);
}

void ToSpan(NWilson::TSpan& span) const;

std::unique_ptr<TEvGetBlockResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
TGroupId groupId);
};

struct TEvGetBlockResult : public TEventLocal<TEvGetBlockResult, EvGetBlockResult> {
NKikimrProto::EReplyStatus Status;
ui64 TabletId;
ui32 BlockedGeneration;
TString ErrorReason;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvGetBlockResult(NKikimrProto::EReplyStatus status, ui64 tabletId, ui32 blockedGeneration)
: Status(status)
, TabletId(tabletId)
, BlockedGeneration(blockedGeneration)
{}

TString Print(bool /*isFull*/) const {
TStringStream str;
str << "TEvGetBlockResult {Status# " << NKikimrProto::EReplyStatus_Name(Status).data();
str << " TabletId# " << TabletId << " BlockedGeneration# " << BlockedGeneration;
if (ErrorReason.size()) {
str << " ErrorReason# \"" << ErrorReason << "\"";
}
str << "}";
return str.Str();
}

TString ToString() const {
return Print(false);
}
};

struct TEvBlock : public TEventLocal<TEvBlock, EvBlock> {
const ui64 TabletId;
const ui32 Generation;
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/blob_depot/agent/agent_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace NKikimr::NBlobDepot {
XX(EvPut) \
XX(EvGet) \
XX(EvBlock) \
XX(EvGetBlock) \
XX(EvDiscover) \
XX(EvRange) \
XX(EvCollectGarbage) \
Expand Down Expand Up @@ -124,6 +125,7 @@ namespace NKikimr::NBlobDepot {
TEvBlobDepot::TEvRegisterAgentResult*,
TEvBlobDepot::TEvAllocateIdsResult*,
TEvBlobDepot::TEvBlockResult*,
TEvBlobDepot::TEvGetBlockResult*,
TEvBlobDepot::TEvQueryBlocksResult*,
TEvBlobDepot::TEvCollectGarbageResult*,
TEvBlobDepot::TEvCommitBlobSeqResult*,
Expand Down Expand Up @@ -228,6 +230,7 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvBlobDepot::TEvRegisterAgentResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvAllocateIdsResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvBlockResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvGetBlockResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvQueryBlocksResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvCollectGarbageResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvCommitBlobSeqResult, HandleTabletResponse);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blob_depot/agent/comm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ namespace NKikimr::NBlobDepot {
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvCollectGarbage msg, TRequestSender *sender, TRequestContext::TPtr context);
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestContext::TPtr context);
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestContext::TPtr context);
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvGetBlock msg, TRequestSender *sender, TRequestContext::TPtr context);
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestContext::TPtr context);
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvCommitBlobSeq msg, TRequestSender *sender, TRequestContext::TPtr context);
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvDiscardSpoiledBlobSeq msg, TRequestSender *sender, TRequestContext::TPtr context);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blob_depot/agent/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ namespace NKikimr::NBlobDepot {
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvRegisterAgentResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvAllocateIdsResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvBlockResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvGetBlockResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvQueryBlocksResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCollectGarbageResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev);
Expand Down
23 changes: 23 additions & 0 deletions ydb/core/blob_depot/agent/storage_get_block.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include "agent_impl.h"
#include "blocks.h"

namespace NKikimr::NBlobDepot {

template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvGetBlock>(std::unique_ptr<IEventHandle> ev) {
class TGetBlockQuery : public TBlobStorageQuery<TEvBlobStorage::TEvGetBlock> {
public:
using TBlobStorageQuery::TBlobStorageQuery;

void Initiate() override {
// TODO: implement GetBlock logic for BlobDepot
}

void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override {
// TODO: implement GetBlock logic for BlobDepot
}
};
return new TGetBlockQuery(*this, std::move(ev));
}

} // NKikimr::NBlobDepot
1 change: 1 addition & 0 deletions ydb/core/blob_depot/agent/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ LIBRARY()
# DS Proxy queries
storage_put.cpp
storage_get.cpp
storage_get_block.cpp
storage_block.cpp
storage_discover.cpp
storage_range.cpp
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/blob_depot/blob_depot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvBlobDepot::TEvDiscardSpoiledBlobSeq, Handle);
hFunc(TEvBlobDepot::TEvResolve, Data->Handle);
hFunc(TEvBlobDepot::TEvBlock, BlocksManager->Handle);
hFunc(TEvBlobDepot::TEvGetBlock, BlocksManager->Handle);
hFunc(TEvBlobDepot::TEvQueryBlocks, BlocksManager->Handle);
hFunc(TEvBlobDepot::TEvCollectGarbage, BarrierServer->Handle);
hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle);
Expand Down Expand Up @@ -107,6 +108,7 @@ namespace NKikimr::NBlobDepot {
fFunc(TEvBlobDepot::EvDiscardSpoiledBlobSeq, handleFromAgentPipe);
fFunc(TEvBlobDepot::EvResolve, handleFromAgentPipe);
fFunc(TEvBlobDepot::EvBlock, handleFromAgentPipe);
fFunc(TEvBlobDepot::EvGetBlock, handleFromAgentPipe);
fFunc(TEvBlobDepot::EvQueryBlocks, handleFromAgentPipe);
fFunc(TEvBlobDepot::EvPushNotifyResult, handleFromAgentPipe);
fFunc(TEvBlobDepot::EvCollectGarbage, handleFromAgentPipe);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/blob_depot/blocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ namespace NKikimr::NBlobDepot {
TActivationContext::Send(response.release()); // not sent if the request got processed and response now is nullptr
}

void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvGetBlock::TPtr ev) {
// TODO: implement GetBlock logic for BlobDepot
}

void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) {
TAgent& agent = Self->GetAgent(ev->Recipient);
const ui32 agentId = agent.Connection->NodeId;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blob_depot/blocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ namespace NKikimr::NBlobDepot {
void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, ui64 issuerGuid,
std::unique_ptr<IEventHandle> response);
void Handle(TEvBlobDepot::TEvBlock::TPtr ev);
void Handle(TEvBlobDepot::TEvGetBlock::TPtr ev);
void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev);

bool CheckBlock(ui64 tabletId, ui32 generation) const;
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/blob_depot/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ namespace NKikimr {
EvPushNotify,
EvPushNotifyResult,
EvBlock,
EvGetBlock,
EvBlockResult,
EvGetBlockResult,
EvQueryBlocks,
EvQueryBlocksResult,
EvCollectGarbage,
Expand Down Expand Up @@ -63,6 +65,8 @@ namespace NKikimr {
BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotifyResult);
BLOBDEPOT_EVENT_PB(EvBlock, TabletId, BlockedGeneration, IssuerGuid);
BLOBDEPOT_EVENT_PB(EvBlockResult, Status, ErrorReason, TimeToLiveMs);
BLOBDEPOT_EVENT_PB(EvGetBlock, TabletId);
BLOBDEPOT_EVENT_PB(EvGetBlockResult, Status, ErrorReason, TabletId, BlockedGeneration);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocks);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocksResult);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvCollectGarbage);
Expand All @@ -81,6 +85,7 @@ namespace NKikimr {
template<> struct TResponseFor<TEvRegisterAgent> { using Type = TEvRegisterAgentResult; };
template<> struct TResponseFor<TEvAllocateIds> { using Type = TEvAllocateIdsResult; };
template<> struct TResponseFor<TEvBlock> { using Type = TEvBlockResult; };
template<> struct TResponseFor<TEvGetBlock> { using Type = TEvGetBlockResult; };
template<> struct TResponseFor<TEvQueryBlocks> { using Type = TEvQueryBlocksResult; };
template<> struct TResponseFor<TEvCollectGarbage> { using Type = TEvCollectGarbageResult; };
template<> struct TResponseFor<TEvCommitBlobSeq> { using Type = TEvCommitBlobSeqResult; };
Expand All @@ -103,6 +108,7 @@ namespace NKikimr {
template<> struct TEventFor<NKikimrBlobDepot::TEvCollectGarbage> { using Type = TEvCollectGarbage; };
template<> struct TEventFor<NKikimrBlobDepot::TEvQueryBlocks> { using Type = TEvQueryBlocks; };
template<> struct TEventFor<NKikimrBlobDepot::TEvBlock> { using Type = TEvBlock; };
template<> struct TEventFor<NKikimrBlobDepot::TEvGetBlock> { using Type = TEvGetBlock; };
template<> struct TEventFor<NKikimrBlobDepot::TEvResolve> { using Type = TEvResolve; };
template<> struct TEventFor<NKikimrBlobDepot::TEvCommitBlobSeq> { using Type = TEvCommitBlobSeq; };
template<> struct TEventFor<NKikimrBlobDepot::TEvDiscardSpoiledBlobSeq> { using Type = TEvDiscardSpoiledBlobSeq; };
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ NActors::NLog::EPriority PriorityForStatusInbound(NKikimrProto::EReplyStatus sta
XX(TEvBlobStorage::TEvPut) \
XX(TEvBlobStorage::TEvGet) \
XX(TEvBlobStorage::TEvBlock) \
XX(TEvBlobStorage::TEvGetBlock) \
XX(TEvBlobStorage::TEvDiscover) \
XX(TEvBlobStorage::TEvRange) \
XX(TEvBlobStorage::TEvCollectGarbage) \
Expand Down Expand Up @@ -486,6 +487,16 @@ struct TBlobStorageGroupBlockParameters {
};
IActor* CreateBlobStorageGroupBlockRequest(TBlobStorageGroupBlockParameters params);

struct TBlobStorageGroupGetBlockParameters {
TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvGetBlock> Common;
TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
.LogComponent = NKikimrServices::BS_PROXY_GETBLOCK,
.Name = "DSProxy.GetBlock",
.Activity = NKikimrServices::TActivity::BS_GROUP_GETBLOCK,
};
};
IActor* CreateBlobStorageGroupGetBlockRequest(TBlobStorageGroupGetBlockParameters params);

struct TBlobStorageGroupStatusParameters {
TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvStatus> Common;
TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
Expand Down
Loading

0 comments on commit 4ca0cb6

Please sign in to comment.