Skip to content

Commit

Permalink
Add GetBlock request
Browse files Browse the repository at this point in the history
  • Loading branch information
mregrock committed Dec 10, 2024
1 parent 309e2b0 commit 6bb8ff3
Show file tree
Hide file tree
Showing 12 changed files with 246 additions and 0 deletions.
12 changes: 12 additions & 0 deletions ydb/core/base/blobstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ std::unique_ptr<TEvBlobStorage::TEvBlockResult> TEvBlobStorage::TEvBlock::MakeEr
return res;
}

void TEvBlobStorage::TEvGetBlock::ToSpan(NWilson::TSpan& span) const {
span
.Attribute("TabletId", ::ToString(TabletId));
}

std::unique_ptr<TEvBlobStorage::TEvGetBlockResult> TEvBlobStorage::TEvGetBlock::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvGetBlockResult>(status, TabletId, 0);
res->ErrorReason = errorReason;
return res;
}

void TEvBlobStorage::TEvPatch::ToSpan(NWilson::TSpan& span) const {
span
.Attribute("OriginalGroupId", OriginalGroupId)
Expand Down
65 changes: 65 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ struct TEvBlobStorage {
EvAssimilate,

EvGetQueuesInfo, // for debugging purposes
EvGetBlock,

//
EvPutResult = EvPut + 512, /// 268 632 576
Expand All @@ -509,6 +510,7 @@ struct TEvBlobStorage {
EvAssimilateResult,

EvQueuesInfo, // for debugging purposes
EvGetBlockResult,

// proxy <-> vdisk interface
EvVPut = EvPut + 2 * 512, /// 268 633 088
Expand Down Expand Up @@ -915,6 +917,7 @@ struct TEvBlobStorage {

struct TEvPutResult;
struct TEvGetResult;
struct TEvGetBlockResult;
struct TEvBlockResult;
struct TEvDiscoverResult;
struct TEvRangeResult;
Expand Down Expand Up @@ -1330,6 +1333,68 @@ struct TEvBlobStorage {
}
};

struct TEvGetBlock : public TEventLocal<TEvGetBlock, EvGetBlock> {
const ui64 TabletId;
const TInstant Deadline;
ui32 RestartCounter = 0;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvGetBlock(ui64 tabletId, TInstant deadline)
: TabletId(tabletId)
, Deadline(deadline)
{}

TString Print(bool /*isFull*/) const {
TStringStream str;
str << "TEvGetBlock {TabletId# " << TabletId
<< " Deadline# " << Deadline
<< "}";
return str.Str();
}

TString ToString() const {
return Print(false);
}

ui32 CalculateSize() const {
return sizeof(*this);
}

void ToSpan(NWilson::TSpan& span) const;

std::unique_ptr<TEvGetBlockResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
TGroupId groupId);
};

struct TEvGetBlockResult : public TEventLocal<TEvGetBlockResult, EvGetBlockResult> {
NKikimrProto::EReplyStatus Status;
ui64 TabletId;
ui32 BlockedGeneration;
TString ErrorReason;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvGetBlockResult(NKikimrProto::EReplyStatus status, ui64 tabletId, ui32 blockedGeneration)
: Status(status)
, TabletId(tabletId)
, BlockedGeneration(blockedGeneration)
{}

TString Print(bool /*isFull*/) const {
TStringStream str;
str << "TEvGetBlockResult {Status# " << NKikimrProto::EReplyStatus_Name(Status).data();
str << " TabletId# " << TabletId << " BlockedGeneration# " << BlockedGeneration;
if (ErrorReason.size()) {
str << " ErrorReason# \"" << ErrorReason << "\"";
}
str << "}";
return str.Str();
}

TString ToString() const {
return Print(false);
}
};

struct TEvBlock : public TEventLocal<TEvBlock, EvBlock> {
const ui64 TabletId;
const ui32 Generation;
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,16 @@ struct TBlobStorageGroupBlockParameters {
};
IActor* CreateBlobStorageGroupBlockRequest(TBlobStorageGroupBlockParameters params);

struct TBlobStorageGroupGetBlockParameters {
TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvGetBlock> Common;
TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
.LogComponent = NKikimrServices::BS_PROXY_GETBLOCK,
.Name = "DSProxy.GetBlock",
.Activity = NKikimrServices::TActivity::BS_GROUP_GETBLOCK,
};
};
IActor* CreateBlobStorageGroupGetBlockRequest(TBlobStorageGroupGetBlockParameters params);

struct TBlobStorageGroupStatusParameters {
TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvStatus> Common;
TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
Expand Down
126 changes: 126 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_block.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#include "dsproxy.h"
#include "dsproxy_mon.h"
#include "dsproxy_quorum_tracker.h"
#include <ydb/core/blobstorage/base/blobstorage_events.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>

namespace NKikimr {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// GET BLOCK request
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

class TBlobStorageGroupGetBlockRequest : public TBlobStorageGroupRequestActor {
const ui64 TabletId;
ui64 Generation;
const TInstant Deadline;
ui64 Requests = 0;
ui64 Responses = 0;
TGroupQuorumTracker QuorumTracker;

void Handle(TEvBlobStorage::TEvVGetBlockResult::TPtr &ev) {
ProcessReplyFromQueue(ev->Get());
const NKikimrBlobStorage::TEvVGetBlockResult &record = ev->Get()->Record;
Y_ABORT_UNLESS(record.HasStatus());
const NKikimrProto::EReplyStatus status = record.GetStatus();
Y_ABORT_UNLESS(record.HasVDiskID());
const TVDiskID vdisk = VDiskIDFromVDiskID(record.GetVDiskID());

DSP_LOG_LOG_S(PriorityForStatusInbound(status), "DSPGB01", "Handle TEvVGetBlockResult"
<< " status# " << NKikimrProto::EReplyStatus_Name(status)
<< " From# " << vdisk.ToString()
<< " NodeId# " << Info->GetActorId(vdisk).NodeId());

if (record.HasGeneration()) {
Generation = Max<ui32>(Generation, record.GetGeneration());
}
++Responses;

switch (const NKikimrProto::EReplyStatus overallStatus = QuorumTracker.ProcessReply(vdisk, status)) {
case NKikimrProto::OK:
if (Responses == Requests) {
ReplyAndDie(NKikimrProto::OK);
}
break;

case NKikimrProto::ERROR:
ReplyAndDie(NKikimrProto::ERROR);
break;

default:
break;
}
}

void ReplyAndDie(NKikimrProto::EReplyStatus status) override {
auto result = std::make_unique<TEvBlobStorage::TEvGetBlockResult>(status, TabletId, Generation);
result->ErrorReason = ErrorReason;
DSP_LOG_DEBUG_S("DSPGB02", "ReplyAndDie Result# " << result->Print(false));
SendResponseAndDie(std::move(result));
}

void SendGetBlockRequest(const TVDiskID& vdiskId) {
DSP_LOG_DEBUG_S("DSPB03", "Sending TEvVBlock Tablet# " << TabletId
<< " Generation# " << Generation
<< " vdiskId# " << vdiskId
<< " node# " << Info->GetActorId(vdiskId).NodeId());

auto msg = std::make_unique<TEvBlobStorage::TEvVGetBlock>(TabletId, vdiskId, Deadline);
SendToQueue(std::move(msg), 0);
}

std::unique_ptr<IEventBase> RestartQuery(ui32 counter) override {
++*Mon->NodeMon->RestartGetBlock;
auto ev = std::make_unique<TEvBlobStorage::TEvGetBlock>(TabletId, Deadline);
ev->RestartCounter = counter;
return ev;
}
public:
::NMonitoring::TDynamicCounters::TCounterPtr& GetActiveCounter() const override {
return Mon->ActiveGetBlock;
}

ERequestType GetRequestType() const override {
return ERequestType::GetBlock;
}

TBlobStorageGroupGetBlockRequest(TBlobStorageGroupGetBlockParameters& params)
: TBlobStorageGroupRequestActor(params)
, TabletId(params.Common.Event->TabletId)
, Deadline(params.Common.Event->Deadline)
, QuorumTracker(Info.Get())
{}

void Bootstrap() override {
DSP_LOG_INFO_S("DSPGB04", "bootstrap"
<< " ActorId# " << SelfId()
<< " Group# " << Info->GroupID
<< " Deadline# " << Deadline
<< " RestartCounter# " << RestartCounter);

for (const auto& vdisk : Info->GetVDisks()) {
SendGetBlockRequest(Info->GetVDiskId(vdisk.OrderNumber));
++Requests;
}

Become(&TBlobStorageGroupGetBlockRequest::StateWait);

if (Requests == 0) {
ReplyAndDie(NKikimrProto::OK);
}
}

STATEFN(StateWait) {
if (ProcessEvent(ev)) {
return;
}
switch (ev->GetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvVGetBlockResult, Handle);
}
}
};

IActor* CreateBlobStorageGroupGetBlockRequest(TBlobStorageGroupGetBlockParameters params) {
return new TBlobStorageGroupGetBlockRequest(params);
}

} // NKikimr
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni
ActiveGet = ActiveRequestsGroup->GetCounter("ActiveGet");
ActiveGetCapacity = ActiveRequestsGroup->GetCounter("ActiveGetCapacity");
ActiveBlock = ActiveRequestsGroup->GetCounter("ActiveBlock");
ActiveGetBlock = ActiveRequestsGroup->GetCounter("ActiveGetBlock");
ActiveDiscover = ActiveRequestsGroup->GetCounter("ActiveDiscover");
ActiveRange = ActiveRequestsGroup->GetCounter("ActiveRange");
ActiveCollectGarbage = ActiveRequestsGroup->GetCounter("ActiveCollectGarbage");
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_mon.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ enum class ERequestType {
Status,
Assimilate,
Block,
GetBlock,
};

struct TRequestMonGroup {
Expand Down Expand Up @@ -196,6 +197,7 @@ class TBlobStorageGroupProxyMon : public TThrRefBase {
TRequestMonGroup StatusGroup;
TRequestMonGroup AssimilateGroup;
TRequestMonGroup BlockGroup;
TRequestMonGroup GetBlockGroup;

public:
TBlobStorageGroupProxyTimeStats TimeStats;
Expand Down Expand Up @@ -230,6 +232,7 @@ class TBlobStorageGroupProxyMon : public TThrRefBase {
::NMonitoring::TDynamicCounters::TCounterPtr ActivePut;
::NMonitoring::TDynamicCounters::TCounterPtr ActivePutCapacity;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveGet;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveGetBlock;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveGetCapacity;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveBlock;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveDiscover;
Expand Down Expand Up @@ -268,6 +271,8 @@ class TBlobStorageGroupProxyMon : public TThrRefBase {
case ERequestType::Status: return StatusGroup;
case ERequestType::Assimilate: return AssimilateGroup;
case ERequestType::Block: return BlockGroup;
case ERequestType::GetBlock: return GetBlockGroup;

}
Y_ABORT();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ TDsProxyNodeMon::TDsProxyNodeMon(TIntrusivePtr<::NMonitoring::TDynamicCounters>
auto group = Group->GetSubgroup("subsystem", "restart");
RestartPut = group->GetCounter("EvPut", true);
RestartGet = group->GetCounter("EvGet", true);
RestartGetBlock = group->GetCounter("EvGetBlock", true);
RestartPatch = group->GetCounter("EvPatch", true);
RestartBlock = group->GetCounter("EvBlock", true);
RestartDiscover = group->GetCounter("EvDiscover", true);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct TDsProxyNodeMon : public TThrRefBase {
// restart counters
::NMonitoring::TDynamicCounters::TCounterPtr RestartPut;
::NMonitoring::TDynamicCounters::TCounterPtr RestartGet;
::NMonitoring::TDynamicCounters::TCounterPtr RestartGetBlock;
::NMonitoring::TDynamicCounters::TCounterPtr RestartBlock;
::NMonitoring::TDynamicCounters::TCounterPtr RestartDiscover;
::NMonitoring::TDynamicCounters::TCounterPtr RestartRange;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ SRCS(
dsproxy_discover_m3dc.cpp
dsproxy_discover_m3of4.cpp
dsproxy_get.cpp
dsproxy_get_block.cpp
dsproxy_get_impl.cpp
dsproxy_get_impl.h
dsproxy_indexrestoreget.cpp
Expand Down
21 changes: 21 additions & 0 deletions ydb/core/blobstorage/ut_blobstorage/get_block.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>

Y_UNIT_TEST_SUITE(GetBlock) {
Y_UNIT_TEST(EmptyGetBlockCmd) {
TEnvironmentSetup env({
.Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
});
auto& runtime = env.Runtime;

env.CreateBoxAndPool(1, 1);
auto info = env.GetGroupInfo(env.GetGroups().front());

auto ev = std::make_unique<TEvBlobStorage::TEvGetBlock>(1u, TInstant::Max());
const TActorId edge = runtime->AllocateEdgeActor(1, __FILE__, __LINE__);
runtime->WrapInActorContext(edge, [&] {
SendToBSProxy(edge, info->GroupID, ev.release());
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetBlockResult>(edge);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::ERROR);
}
}
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_blobstorage/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ SRCS(
gc.cpp
gc_quorum_3dc.cpp
get.cpp
get_block.cpp
group_reconfiguration.cpp
incorrect_queries.cpp
index_restore_get.cpp
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/services/services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ enum EServiceKikimr {
BS_VDISK_DEFRAG = 346;
BS_PROXY_ASSIMILATE = 347;
BS_VDISK_BALANCING = 2600;
BS_PROXY_GETBLOCK = 2601;

// DATASHARD section //
TX_DATASHARD = 290; //
Expand Down Expand Up @@ -1068,5 +1069,6 @@ message TActivity {
RESHUFFLE_KMEANS_SCAN_ACTOR = 650;
FEATURE_FLAGS_CONFIGURATOR = 651;
DATASHARD_READ_SCAN = 652;
BS_GROUP_GETBLOCK = 653;
};
};

0 comments on commit 6bb8ff3

Please sign in to comment.