Skip to content

Commit

Permalink
Add GetBlock request
Browse files Browse the repository at this point in the history
  • Loading branch information
mregrock committed Dec 9, 2024
1 parent 309e2b0 commit d79000d
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 0 deletions.
67 changes: 67 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ struct TEvBlobStorage {
// user <-> proxy interface
EvPut = EventSpaceBegin(TKikimrEvents::ES_BLOBSTORAGE), /// 268 632 064
EvGet,
EvGetBlock,
EvBlock,
EvDiscover,
EvRange,
Expand All @@ -497,6 +498,7 @@ struct TEvBlobStorage {
//
EvPutResult = EvPut + 512, /// 268 632 576
EvGetResult,
EvGetBlockResult,
EvBlockResult,
EvDiscoverResult,
EvRangeResult,
Expand Down Expand Up @@ -915,6 +917,7 @@ struct TEvBlobStorage {

struct TEvPutResult;
struct TEvGetResult;
struct TEvGetBlockResult;
struct TEvBlockResult;
struct TEvDiscoverResult;
struct TEvRangeResult;
Expand Down Expand Up @@ -1330,6 +1333,70 @@ struct TEvBlobStorage {
}
};

struct TEvGetBlock : public TEventLocal<TEvGetBlock, EvGetBlock> {
const TInstant Deadline;
const ui64 TabletId;
ui32 RestartCounter = 0;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvGetBlock(TInstant deadline, ui64 tabletId)
: Deadline(deadline)
, TabletId(tabletId)
{}

TString Print(bool isFull) const {
Y_UNUSED(isFull);
TStringStream str;
str << "TEvGetBlock {TabletId# " << TabletId
<< " Deadline# " << Deadline.MilliSeconds()
<< "}";
return str.Str();
}

TString ToString() const {
return Print(false);
}

ui32 CalculateSize() const {
return sizeof(*this);
}

void ToSpan(NWilson::TSpan& span) const;

std::unique_ptr<TEvGetBlockResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
TGroupId groupId);
};

struct TEvGetBlockResult : public TEventLocal<TEvGetBlockResult, EvGetBlockResult> {
NKikimrProto::EReplyStatus Status;
ui64 TabletId;
ui32 BlockedGeneration;
TString ErrorReason;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvGetBlockResult(NKikimrProto::EReplyStatus status, ui64 tabletId, ui32 blockedGeneration)
: Status(status)
, TabletId(tabletId)
, BlockedGeneration(blockedGeneration)
{}

TString Print(bool isFull) const {
Y_UNUSED(isFull);
TStringStream str;
str << "TEvGetBlockResult {Status# " << NKikimrProto::EReplyStatus_Name(Status).data();
str << " TabletId# " << TabletId << " BlockedGeneration# " << BlockedGeneration;
if (ErrorReason.size()) {
str << " ErrorReason# \"" << ErrorReason << "\"";
}
str << "}";
return str.Str();
}

TString ToString() const {
return Print(false);
}
};

struct TEvBlock : public TEventLocal<TEvBlock, EvBlock> {
const ui64 TabletId;
const ui32 Generation;
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,16 @@ struct TBlobStorageGroupBlockParameters {
};
IActor* CreateBlobStorageGroupBlockRequest(TBlobStorageGroupBlockParameters params);

struct TBlobStorageGroupGetBlockParameters {
TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvVGetBlock> Common;
TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
.LogComponent = NKikimrServices::BS_PROXY_GETBLOCK,
.Name = "DSProxy.GetBlock",
.Activity = NKikimrServices::TActivity::BS_GROUP_GETBLOCK,
};
};
IActor* CreateBlobStorageGroupGetBlockRequest(TBlobStorageGroupGetBlockParameters params);

struct TBlobStorageGroupStatusParameters {
TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvStatus> Common;
TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
Expand Down
139 changes: 139 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_block.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#include "dsproxy.h"
#include "dsproxy_mon.h"
#include "dsproxy_quorum_tracker.h"
#include <ydb/core/blobstorage/base/blobstorage_events.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>

namespace NKikimr {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// GET BLOCK request
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

class TBlobStorageGroupGetBlockRequest : public TBlobStorageGroupRequestActor {
const ui64 TabletId;
ui64 Generation;
const TInstant Deadline;
ui64 Requests;
ui64 Responses;
TGroupQuorumTracker QuorumTracker;

void Handle(TEvBlobStorage::TEvVGetBlockResult::TPtr &ev) {
ProcessReplyFromQueue(ev->Get());
const NKikimrBlobStorage::TEvVGetBlockResult &record = ev->Get()->Record;
Y_ABORT_UNLESS(record.HasStatus());
const NKikimrProto::EReplyStatus status = record.GetStatus();
Y_ABORT_UNLESS(record.HasVDiskID());
const TVDiskID vdisk = VDiskIDFromVDiskID(record.GetVDiskID());

DSP_LOG_LOG_S(PriorityForStatusInbound(status), "DSPGB01", "Handle TEvVGetBlockResult"
<< " status# " << NKikimrProto::EReplyStatus_Name(status).data()
<< " From# " << vdisk.ToString()
<< " NodeId# " << Info->GetActorId(vdisk).NodeId());

if (record.HasGeneration()) {
Generation = max(Generation, record.GetGeneration());
}
++Responses;

switch (const NKikimrProto::EReplyStatus overallStatus = QuorumTracker.ProcessReply(vdisk, status)) {
case NKikimrProto::OK:
if (Responses == Requests) {
ReplyAndDie(NKikimrProto::OK);
}
break;

case NKikimrProto::ERROR:
ReplyAndDie(NKikimrProto::ERROR);
break;

default:
break;
}
}

void ReplyAndDie(NKikimrProto::EReplyStatus status) {
auto result = std::make_unique<TEvBlobStorage::TEvGetBlockResult>(status);
result->Generation = Generation;
result->ErrorReason = ErrorReason;
DSP_LOG_DEBUG_S("DSPGB02", "ReplyAndDie Result# " << result->Print(false));
SendResponseAndDie(std::move(result));
}

void SendGetBlockRequest(const TVDiskID& vdiskId) {
const ui64 cookie = TVDiskIdShort(vdiskId).GetRaw();

DSP_LOG_DEBUG_S("DSPB03", "Sending TEvVBlock Tablet# " << TabletId
<< " Generation# " << Generation
<< " vdiskId# " << vdiskId
<< " node# " << Info->GetActorId(vdiskId).NodeId());

auto msg = std::make_unique<TEvBlobStorage::TEvVGetBlock>(TabletId, vdiskId, Deadline);
SendToQueue(std::move(msg), cookie);
}

std::unique_ptr<IEventBase> RestartQuery(ui32 counter) override {
++*Mon->NodeMon->RestartGetBlock;
auto ev = std::make_unique<TEvBlobStorage::TEvVGetBlock>(Deadline);
ev->RestartCounter = counter;
return ev;
}
public:
::NMonitoring::TDynamicCounters::TCounterPtr& GetActiveCounter() const override {
return Mon->ActiveGetBlock;
}

ERequestType GetRequestType() const override {
return ERequestType::GetBlock;
}

TBlobStorageGroupGetBlockRequest(TBlobStorageGroupGetBlockParameters& params)
: TBlobStorageGroupRequestActor(params)
, TabletId(params.Common.Event->TabletId)
, Deadline(params.Common.Event->Deadline)
, Requests(0)
, Responses(0)
, QuorumTracker(Info.Get())
{}

void Bootstrap() override {
DSP_LOG_INFO_S("DSPGB04", "bootstrap"
<< " ActorId# " << SelfId()
<< " Group# " << Info->GroupID
<< " Deadline# " << Deadline
<< " RestartCounter# " << RestartCounter);

for (const auto& vdisk : Info->GetVDisks()) {
const ui64 cookie = TVDiskIdShort(Info->GetVDiskId(vdisk.OrderNumber)).GetRaw();

auto vd = Info->GetVDiskId(vdisk.OrderNumber);
DSP_LOG_DEBUG_S("DSGB05", "Sending TEvVGetBlock"
<< " vDiskId# " << vd
<< " node# " << Info->GetActorId(vd).NodeId());

auto msg = std::make_unique<TEvBlobStorage::TEvVGetBlock>(TabletId, vd, Deadline);
SendToQueue(std::move(msg), cookie);
++Requests;
}

Become(&TBlobStorageGroupGetBlockRequest::StateWait);

if (Requests == 0) {
ReplyAndDie(NKikimrProto::OK);
}
}

STATEFN(StateWait) {
if (ProcessEvent(ev)) {
return;
}
switch (ev->GetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvVGetBlockResult, Handle);
}
}
};

IActor* CreateBlobStorageGroupGetBlockRequest(TBlobStorageGroupGetBlockParameters params) {
return new TBlobStorageGroupGetBlockRequest(params);
}

} // NKikimr
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_mon.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class TBlobStorageGroupProxyMon : public TThrRefBase {
::NMonitoring::TDynamicCounters::TCounterPtr ActivePut;
::NMonitoring::TDynamicCounters::TCounterPtr ActivePutCapacity;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveGet;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveGetBlock;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveGetCapacity;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveBlock;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveDiscover;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct TDsProxyNodeMon : public TThrRefBase {
// restart counters
::NMonitoring::TDynamicCounters::TCounterPtr RestartPut;
::NMonitoring::TDynamicCounters::TCounterPtr RestartGet;
::NMonitoring::TDynamicCounters::TCounterPtr RestartGetBlock;
::NMonitoring::TDynamicCounters::TCounterPtr RestartBlock;
::NMonitoring::TDynamicCounters::TCounterPtr RestartDiscover;
::NMonitoring::TDynamicCounters::TCounterPtr RestartRange;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ SRCS(
dsproxy_discover_m3dc.cpp
dsproxy_discover_m3of4.cpp
dsproxy_get.cpp
dsproxy_get_block.cpp
dsproxy_get_impl.cpp
dsproxy_get_impl.h
dsproxy_indexrestoreget.cpp
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/services/services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ enum EServiceKikimr {
BS_VDISK_DEFRAG = 346;
BS_PROXY_ASSIMILATE = 347;
BS_VDISK_BALANCING = 2600;
BS_PROXY_GETBLOCK = 2601;

// DATASHARD section //
TX_DATASHARD = 290; //
Expand Down Expand Up @@ -1068,5 +1069,6 @@ message TActivity {
RESHUFFLE_KMEANS_SCAN_ACTOR = 650;
FEATURE_FLAGS_CONFIGURATOR = 651;
DATASHARD_READ_SCAN = 652;
BS_GROUP_GETBLOCK = 653;
};
};

0 comments on commit d79000d

Please sign in to comment.