Skip to content

Commit

Permalink
Merge 60c20be into 3250bfc
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored Dec 27, 2024
2 parents 3250bfc + 60c20be commit b94295a
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 20 deletions.
1 change: 1 addition & 0 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,7 @@ struct TEvPQ {
};

struct TEvGetWriteInfoRequest : public TEventLocal<TEvGetWriteInfoRequest, EvGetWriteInfoRequest> {
TActorId OriginalPartition;
};

struct TEvGetWriteInfoResponse : public TEventLocal<TEvGetWriteInfoResponse, EvGetWriteInfoResponse> {
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TAc

Y_ABORT_UNLESS(IsSupportive());

ev->Get()->OriginalPartition = ev->Sender;
PendingEvents.emplace_back(ev->ReleaseBase().Release());
}

Expand Down Expand Up @@ -1132,11 +1133,16 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx

void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
TActorId originalPartition = ev->Get()->OriginalPartition;
if (!originalPartition) {
// delayed message
originalPartition = ev->Sender;
}
if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) {
PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError");
auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId,
"Write info requested while writes are not complete");
ctx.Send(ev->Sender, response);
ctx.Send(originalPartition, response);
ClosedInternalPartition = true;
return;
}
Expand All @@ -1160,7 +1166,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
response->InputLags = std::move(SupportivePartitionTimeLag);

PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoResponse");
ctx.Send(ev->Sender, response);
ctx.Send(originalPartition, response);
}

void TPartition::WriteInfoResponseHandler(
Expand Down
65 changes: 47 additions & 18 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,8 @@ void TPersQueue::CreateOriginalPartition(const NKikimrPQ::TPQTabletConfig& confi
void TPersQueue::MoveTopTxToCalculating(TDistributedTransaction& tx,
const TActorContext& ctx)
{
std::tie(ExecStep, ExecTxId) = TxQueue.front();
PQ_LOG_D("New ExecStep " << ExecStep << ", ExecTxId " << ExecTxId);
// std::tie(ExecStep, ExecTxId) = TxQueue.front();
// PQ_LOG_D("New ExecStep " << ExecStep << ", ExecTxId " << ExecTxId);

switch (tx.Kind) {
case NKikimrPQ::TTransaction::KIND_DATA:
Expand Down Expand Up @@ -999,9 +999,10 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
Txs.emplace(tx.GetTxId(), tx);

if (tx.HasStep()) {
if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) {
PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId());
}
// if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) {
// PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId());
// }
PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId());
}

if (tx.HasWriteId()) {
Expand Down Expand Up @@ -2891,7 +2892,10 @@ void TPersQueue::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TA

void TPersQueue::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvTabletPipe::TEvClientConnected");
PQ_LOG_D("Handle TEvTabletPipe::TEvClientConnected. " <<
"Status " << ev->Get()->Status <<
", TabletId " << ev->Get()->TabletId <<
", ClientId " << ev->Get()->ClientId);

Y_ABORT_UNLESS(ev->Get()->Leader, "Unexpectedly connected to follower of tablet %" PRIu64, ev->Get()->TabletId);

Expand All @@ -2915,13 +2919,15 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActo

void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx)
{
for (auto& txId: GetBindedTxs(tabletId)) {
PQ_LOG_D("Restart pipe to tablet " << tabletId);
for (const ui64 txId : GetBindedTxs(tabletId)) {
auto* tx = GetTransaction(ctx, txId);
if (!tx) {
continue;
}

for (auto& message : tx->GetBindedMsgs(tabletId)) {
PQ_LOG_D("Resend message " << message.Type << " to tablet " << tabletId);
PipeClientCache->Send(ctx, tabletId, message.Type, message.Data);
}
}
Expand Down Expand Up @@ -3087,6 +3093,7 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) {
}
MeteringSink.MayFlush(ctx.Now());
DeleteExpiredTransactions(ctx);
SetTxCompleteLagCounter();
ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup());
}

Expand All @@ -3107,6 +3114,23 @@ void TPersQueue::DeleteExpiredTransactions(const TActorContext& ctx)
TryWriteTxs(ctx);
}

void TPersQueue::SetTxCompleteLagCounter()
{
ui64 lag = 0;

if (!TxQueue.empty() && MediatorTimeCastEntry) {
ui64 firstTxStep = TxQueue.front().first;
//ui64 txId = TxQueue.front().second;
ui64 currentStep = MediatorTimeCastEntry->Get(TabletID());

if (currentStep > firstTxStep) {
lag = currentStep - firstTxStep;
}
}

Counters->Simple()[COUNTER_PQ_TABLET_TX_COMPLETE_LAG] = lag;
}

void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPersQueue::TEvCancelTransactionProposal");
Expand Down Expand Up @@ -3326,7 +3350,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte
if (auto tx = GetTransaction(ctx, event.GetTxId()); tx && tx->PredicatesReceived.contains(event.GetTabletProducer())) {
if (tx->State >= NKikimrPQ::TTransaction::EXECUTED) {
if (ack) {
PQ_LOG_D("send TEvReadSetAck to " << event.GetTabletProducer());
PQ_LOG_D("TxId " << event.GetTxId() << ". Send TEvReadSetAck to " << event.GetTabletProducer());
ctx.Send(ev->Sender, ack.release());
return;
}
Expand All @@ -3340,7 +3364,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte
TryWriteTxs(ctx);
}
} else if (ack) {
PQ_LOG_D("send TEvReadSetAck to " << event.GetTabletProducer());
PQ_LOG_D("TxId " << event.GetTxId() << ". Send TEvReadSetAck to " << event.GetTabletProducer());
//
// для неизвестных транзакций подтверждение отправляется сразу
//
Expand Down Expand Up @@ -3661,6 +3685,9 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx)
TryExecuteTxs(ctx, tx);

TxQueue.emplace_back(step, txId);
PQ_LOG_D("TxId " << tx.TxId << " planned");

SetTxCompleteLagCounter();
} else {
PQ_LOG_W("Transaction already planned for step " << tx.Step <<
", Step: " << step <<
Expand Down Expand Up @@ -3849,7 +3876,7 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx,
TString body;
Y_ABORT_UNLESS(data.SerializeToString(&body));

PQ_LOG_D("Send TEvTxProcessing::TEvReadSet to " << tx.PredicateRecipients.size() << " receivers. Wait TEvTxProcessing::TEvReadSet from " << tx.PredicatesReceived.size() << " senders.");
PQ_LOG_D("TxId " << tx.TxId << ". Send TEvReadSet to " << tx.PredicateRecipients.size() << " receivers. Wait TEvReadSet from " << tx.PredicatesReceived.size() << " senders.");
for (auto& [receiverId, _] : tx.PredicateRecipients) {
if (receiverId != TabletID()) {
auto event = std::make_unique<TEvTxProcessing::TEvReadSet>(tx.Step,
Expand All @@ -3859,7 +3886,7 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx,
TabletID(),
body,
0);
PQ_LOG_D("Send TEvReadSet to tablet " << receiverId);
//PQ_LOG_D("Send TEvReadSet to tablet " << receiverId);
SendToPipe(receiverId, tx, std::move(event), ctx);
}
}
Expand All @@ -3868,9 +3895,9 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx,
void TPersQueue::SendEvReadSetAckToSenders(const TActorContext& ctx,
TDistributedTransaction& tx)
{
PQ_LOG_D("TPersQueue::SendEvReadSetAckToSenders");
PQ_LOG_D("TxId " << tx.TxId << ". Send TEvReadSetAck to " << tx.ReadSetAcks.size() << " receivers.");
for (auto& [target, event] : tx.ReadSetAcks) {
PQ_LOG_D("Send TEvTxProcessing::TEvReadSetAck " << event->ToString());
//PQ_LOG_D("Send TEvTxProcessing::TEvReadSetAck " << event->ToString());
ctx.Send(target, event.release());
}
}
Expand Down Expand Up @@ -4370,7 +4397,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,

if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) {
SendEvProposeTransactionResult(ctx, tx);
PQ_LOG_D("complete TxId " << tx.TxId);
PQ_LOG_D("TxId " << tx.TxId << " completed" <<
", FrontTxId " << TxQueue.front().second);

switch (tx.Kind) {
case NKikimrPQ::TTransaction::KIND_DATA:
Expand Down Expand Up @@ -4400,6 +4428,11 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
Y_ABORT_UNLESS(tx.TxId == TxsOrder[tx.State].front(),
"PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64,
TabletID(), tx.TxId, TxsOrder[tx.State].front());
Y_ABORT_UNLESS(tx.TxId == TxQueue.front().second,
"PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64,
TabletID(), tx.TxId, TxQueue.front().second);
TxQueue.pop_front();
SetTxCompleteLagCounter();

SendEvReadSetAckToSenders(ctx, tx);

Expand All @@ -4419,10 +4452,6 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,

case NKikimrPQ::TTransaction::DELETING:
// The PQ tablet has persisted its state. Now she can delete the transaction and take the next one.
if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) {
TxQueue.pop_front();
}

DeleteWriteId(tx.WriteId);
PQ_LOG_D("delete TxId " << tx.TxId);
Txs.erase(tx.TxId);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void DeleteExpiredTransactions(const TActorContext& ctx);
void Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx);

void SetTxCompleteLagCounter();

bool CanProcessProposeTransactionQueue() const;
bool CanProcessPlanStepQueue() const;
bool CanProcessWriteTxs() const;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/counters_pq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ enum ESimpleCounters {
COUNTER_PQ_TABLET_OPENED_PIPES = 5 [(CounterOpts) = {Name: "OpenedPipes"}];
COUNTER_PQ_TABLET_INFLIGHT = 6 [(CounterOpts) = {Name: "RequestsInflight"}];

COUNTER_PQ_TABLET_TX_COMPLETE_LAG = 7 [(CounterOpts) = {Name: "TxCompleteLag"}];
}

enum EPercentileCounters {
Expand Down

0 comments on commit b94295a

Please sign in to comment.