From 86f54735a174a9c38107aaf93c87804310ae021f Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Tue, 24 Dec 2024 14:05:11 +0300 Subject: [PATCH 1/9] [+] COUNTER_PQ_TABLET_TX_COMPLETE_LAG --- ydb/core/persqueue/pq_impl.cpp | 17 +++++++++++++++++ ydb/core/persqueue/pq_impl.h | 2 ++ ydb/core/protos/counters_pq.proto | 1 + 3 files changed, 20 insertions(+) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 6ba9ccf0226a..fd01dc013bef 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -3087,6 +3087,7 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) { } MeteringSink.MayFlush(ctx.Now()); DeleteExpiredTransactions(ctx); + SetTxCompleteLagCounter(); ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup()); } @@ -3107,6 +3108,22 @@ void TPersQueue::DeleteExpiredTransactions(const TActorContext& ctx) TryWriteTxs(ctx); } +void TPersQueue::SetTxCompleteLagCounter() +{ + ui64 lag = 0; + + if (!TxQueue.empty() && MediatorTimeCastEntry) { + ui64 firstTxStep = TxQueue.front().first; + ui64 currentStep = MediatorTimeCastEntry->Get(TabletID()); + + if (currentStep > firstTxStep) { + lag = currentStep - firstTxStep; + } + } + + Counters->Simple()[COUNTER_PQ_TABLET_TX_COMPLETE_LAG] = lag; +} + void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvPersQueue::TEvCancelTransactionProposal"); diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 524e5d02b282..2b2320ce1cc1 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -443,6 +443,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void DeleteExpiredTransactions(const TActorContext& ctx); void Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx); + void SetTxCompleteLagCounter(); + bool CanProcessProposeTransactionQueue() const; bool CanProcessPlanStepQueue() const; bool CanProcessWriteTxs() const; diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto index 8c975d9a4b7b..fb75c5e5bb5e 100644 --- a/ydb/core/protos/counters_pq.proto +++ b/ydb/core/protos/counters_pq.proto @@ -68,6 +68,7 @@ enum ESimpleCounters { COUNTER_PQ_TABLET_OPENED_PIPES = 5 [(CounterOpts) = {Name: "OpenedPipes"}]; COUNTER_PQ_TABLET_INFLIGHT = 6 [(CounterOpts) = {Name: "RequestsInflight"}]; + COUNTER_PQ_TABLET_TX_COMPLETE_LAG = 7 [(CounterOpts) = {Name: "TxCompleteLag"}]; } enum EPercentileCounters { From 22c792c4989566d7fd41ff8c08da44eb0c2d6009 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Wed, 25 Dec 2024 17:05:07 +0300 Subject: [PATCH 2/9] [-] delayed message loses `Sender` --- ydb/core/persqueue/events/internal.h | 1 + ydb/core/persqueue/partition.cpp | 10 ++++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 6d23f435d95c..bc7e3df210db 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -1066,6 +1066,7 @@ struct TEvPQ { }; struct TEvGetWriteInfoRequest : public TEventLocal { + TActorId OriginalPartition; }; struct TEvGetWriteInfoResponse : public TEventLocal { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 656d4ed8d9aa..8ec5e85faa9b 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1015,6 +1015,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TAc Y_ABORT_UNLESS(IsSupportive()); + ev->Get()->OriginalPartition = ev->Sender; PendingEvents.emplace_back(ev->ReleaseBase().Release()); } @@ -1132,11 +1133,16 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest"); + TActorId originalPartition = ev->Get()->OriginalPartition; + if (!originalPartition) { + // delayed message + originalPartition = ev->Sender; + } if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) { PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError"); auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId, "Write info requested while writes are not complete"); - ctx.Send(ev->Sender, response); + ctx.Send(originalPartition, response); ClosedInternalPartition = true; return; } @@ -1160,7 +1166,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon response->InputLags = std::move(SupportivePartitionTimeLag); PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoResponse"); - ctx.Send(ev->Sender, response); + ctx.Send(originalPartition, response); } void TPartition::WriteInfoResponseHandler( From 60c20be0392952ceca50b4c16b05fa658e16c72a Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Wed, 25 Dec 2024 17:10:14 +0300 Subject: [PATCH 3/9] [*] draft --- ydb/core/persqueue/pq_impl.cpp | 48 +++++++++++++++++++++------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index fd01dc013bef..58589d667acc 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -819,8 +819,8 @@ void TPersQueue::CreateOriginalPartition(const NKikimrPQ::TPQTabletConfig& confi void TPersQueue::MoveTopTxToCalculating(TDistributedTransaction& tx, const TActorContext& ctx) { - std::tie(ExecStep, ExecTxId) = TxQueue.front(); - PQ_LOG_D("New ExecStep " << ExecStep << ", ExecTxId " << ExecTxId); +// std::tie(ExecStep, ExecTxId) = TxQueue.front(); +// PQ_LOG_D("New ExecStep " << ExecStep << ", ExecTxId " << ExecTxId); switch (tx.Kind) { case NKikimrPQ::TTransaction::KIND_DATA: @@ -999,9 +999,10 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& Txs.emplace(tx.GetTxId(), tx); if (tx.HasStep()) { - if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) { - PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId()); - } +// if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) { +// PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId()); +// } + PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId()); } if (tx.HasWriteId()) { @@ -2891,7 +2892,10 @@ void TPersQueue::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TA void TPersQueue::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvTabletPipe::TEvClientConnected"); + PQ_LOG_D("Handle TEvTabletPipe::TEvClientConnected. " << + "Status " << ev->Get()->Status << + ", TabletId " << ev->Get()->TabletId << + ", ClientId " << ev->Get()->ClientId); Y_ABORT_UNLESS(ev->Get()->Leader, "Unexpectedly connected to follower of tablet %" PRIu64, ev->Get()->TabletId); @@ -2915,13 +2919,15 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActo void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx) { - for (auto& txId: GetBindedTxs(tabletId)) { + PQ_LOG_D("Restart pipe to tablet " << tabletId); + for (const ui64 txId : GetBindedTxs(tabletId)) { auto* tx = GetTransaction(ctx, txId); if (!tx) { continue; } for (auto& message : tx->GetBindedMsgs(tabletId)) { + PQ_LOG_D("Resend message " << message.Type << " to tablet " << tabletId); PipeClientCache->Send(ctx, tabletId, message.Type, message.Data); } } @@ -3114,6 +3120,7 @@ void TPersQueue::SetTxCompleteLagCounter() if (!TxQueue.empty() && MediatorTimeCastEntry) { ui64 firstTxStep = TxQueue.front().first; + //ui64 txId = TxQueue.front().second; ui64 currentStep = MediatorTimeCastEntry->Get(TabletID()); if (currentStep > firstTxStep) { @@ -3343,7 +3350,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte if (auto tx = GetTransaction(ctx, event.GetTxId()); tx && tx->PredicatesReceived.contains(event.GetTabletProducer())) { if (tx->State >= NKikimrPQ::TTransaction::EXECUTED) { if (ack) { - PQ_LOG_D("send TEvReadSetAck to " << event.GetTabletProducer()); + PQ_LOG_D("TxId " << event.GetTxId() << ". Send TEvReadSetAck to " << event.GetTabletProducer()); ctx.Send(ev->Sender, ack.release()); return; } @@ -3357,7 +3364,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte TryWriteTxs(ctx); } } else if (ack) { - PQ_LOG_D("send TEvReadSetAck to " << event.GetTabletProducer()); + PQ_LOG_D("TxId " << event.GetTxId() << ". Send TEvReadSetAck to " << event.GetTabletProducer()); // // для неизвестных транзакций подтверждение отправляется сразу // @@ -3678,6 +3685,9 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx) TryExecuteTxs(ctx, tx); TxQueue.emplace_back(step, txId); + PQ_LOG_D("TxId " << tx.TxId << " planned"); + + SetTxCompleteLagCounter(); } else { PQ_LOG_W("Transaction already planned for step " << tx.Step << ", Step: " << step << @@ -3866,7 +3876,7 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx, TString body; Y_ABORT_UNLESS(data.SerializeToString(&body)); - PQ_LOG_D("Send TEvTxProcessing::TEvReadSet to " << tx.PredicateRecipients.size() << " receivers. Wait TEvTxProcessing::TEvReadSet from " << tx.PredicatesReceived.size() << " senders."); + PQ_LOG_D("TxId " << tx.TxId << ". Send TEvReadSet to " << tx.PredicateRecipients.size() << " receivers. Wait TEvReadSet from " << tx.PredicatesReceived.size() << " senders."); for (auto& [receiverId, _] : tx.PredicateRecipients) { if (receiverId != TabletID()) { auto event = std::make_unique(tx.Step, @@ -3876,7 +3886,7 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx, TabletID(), body, 0); - PQ_LOG_D("Send TEvReadSet to tablet " << receiverId); + //PQ_LOG_D("Send TEvReadSet to tablet " << receiverId); SendToPipe(receiverId, tx, std::move(event), ctx); } } @@ -3885,9 +3895,9 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx, void TPersQueue::SendEvReadSetAckToSenders(const TActorContext& ctx, TDistributedTransaction& tx) { - PQ_LOG_D("TPersQueue::SendEvReadSetAckToSenders"); + PQ_LOG_D("TxId " << tx.TxId << ". Send TEvReadSetAck to " << tx.ReadSetAcks.size() << " receivers."); for (auto& [target, event] : tx.ReadSetAcks) { - PQ_LOG_D("Send TEvTxProcessing::TEvReadSetAck " << event->ToString()); + //PQ_LOG_D("Send TEvTxProcessing::TEvReadSetAck " << event->ToString()); ctx.Send(target, event.release()); } } @@ -4387,7 +4397,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) { SendEvProposeTransactionResult(ctx, tx); - PQ_LOG_D("complete TxId " << tx.TxId); + PQ_LOG_D("TxId " << tx.TxId << " completed" << + ", FrontTxId " << TxQueue.front().second); switch (tx.Kind) { case NKikimrPQ::TTransaction::KIND_DATA: @@ -4417,6 +4428,11 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, Y_ABORT_UNLESS(tx.TxId == TxsOrder[tx.State].front(), "PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64, TabletID(), tx.TxId, TxsOrder[tx.State].front()); + Y_ABORT_UNLESS(tx.TxId == TxQueue.front().second, + "PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64, + TabletID(), tx.TxId, TxQueue.front().second); + TxQueue.pop_front(); + SetTxCompleteLagCounter(); SendEvReadSetAckToSenders(ctx, tx); @@ -4436,10 +4452,6 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::DELETING: // The PQ tablet has persisted its state. Now she can delete the transaction and take the next one. - if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) { - TxQueue.pop_front(); - } - DeleteWriteId(tx.WriteId); PQ_LOG_D("delete TxId " << tx.TxId); Txs.erase(tx.TxId); From d816f4f749711c59b6af61d935d78e5cbd03b509 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Fri, 27 Dec 2024 12:26:43 +0300 Subject: [PATCH 4/9] [+] COUNTER_PQ_TABLET_TX_IN_FLY --- ydb/core/persqueue/pq_impl.cpp | 11 +++++++++++ ydb/core/persqueue/pq_impl.h | 1 + ydb/core/protos/counters_pq.proto | 1 + 3 files changed, 13 insertions(+) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 58589d667acc..03aa40cae131 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -997,6 +997,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& } Txs.emplace(tx.GetTxId(), tx); + SetTxInFlyCounter(); if (tx.HasStep()) { // if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) { @@ -1014,6 +1015,9 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& EndInitTransactions(); EndReadConfig(ctx); + + SetTxCompleteLagCounter(); + SetTxInFlyCounter(); } void TPersQueue::EndReadConfig(const TActorContext& ctx) @@ -3131,6 +3135,11 @@ void TPersQueue::SetTxCompleteLagCounter() Counters->Simple()[COUNTER_PQ_TABLET_TX_COMPLETE_LAG] = lag; } +void TPersQueue::SetTxInFlyCounter() +{ + Counters->Simple()[COUNTER_PQ_TABLET_TX_IN_FLY] = Txs.size(); +} + void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvPersQueue::TEvCancelTransactionProposal"); @@ -3604,6 +3613,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) const NKikimrPQ::TEvProposeTransaction& event = front->GetRecord(); TDistributedTransaction& tx = Txs[event.GetTxId()]; + SetTxInFlyCounter(); switch (tx.State) { case NKikimrPQ::TTransaction::UNKNOWN: @@ -4455,6 +4465,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, DeleteWriteId(tx.WriteId); PQ_LOG_D("delete TxId " << tx.TxId); Txs.erase(tx.TxId); + SetTxInFlyCounter(); // If this was the last transaction, then you need to send responses to messages about changes // in the status of the PQ tablet (if they came) diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 2b2320ce1cc1..6394c3cae30f 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -444,6 +444,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx); void SetTxCompleteLagCounter(); + void SetTxInFlyCounter(); bool CanProcessProposeTransactionQueue() const; bool CanProcessPlanStepQueue() const; diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto index fb75c5e5bb5e..6aab74eb959f 100644 --- a/ydb/core/protos/counters_pq.proto +++ b/ydb/core/protos/counters_pq.proto @@ -69,6 +69,7 @@ enum ESimpleCounters { COUNTER_PQ_TABLET_INFLIGHT = 6 [(CounterOpts) = {Name: "RequestsInflight"}]; COUNTER_PQ_TABLET_TX_COMPLETE_LAG = 7 [(CounterOpts) = {Name: "TxCompleteLag"}]; + COUNTER_PQ_TABLET_TX_IN_FLY = 8 [(CounterOpts) = {Name: "TxInFly"}]; } enum EPercentileCounters { From 15d22566aee32f89b6064df487560efacf1e5c69 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Fri, 27 Dec 2024 12:38:38 +0300 Subject: [PATCH 5/9] Revert "[-] delayed message loses `Sender`" This reverts commit 22c792c4989566d7fd41ff8c08da44eb0c2d6009. --- ydb/core/persqueue/events/internal.h | 1 - ydb/core/persqueue/partition.cpp | 10 ++-------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index bc7e3df210db..6d23f435d95c 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -1066,7 +1066,6 @@ struct TEvPQ { }; struct TEvGetWriteInfoRequest : public TEventLocal { - TActorId OriginalPartition; }; struct TEvGetWriteInfoResponse : public TEventLocal { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 8ec5e85faa9b..656d4ed8d9aa 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1015,7 +1015,6 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TAc Y_ABORT_UNLESS(IsSupportive()); - ev->Get()->OriginalPartition = ev->Sender; PendingEvents.emplace_back(ev->ReleaseBase().Release()); } @@ -1133,16 +1132,11 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest"); - TActorId originalPartition = ev->Get()->OriginalPartition; - if (!originalPartition) { - // delayed message - originalPartition = ev->Sender; - } if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) { PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError"); auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId, "Write info requested while writes are not complete"); - ctx.Send(originalPartition, response); + ctx.Send(ev->Sender, response); ClosedInternalPartition = true; return; } @@ -1166,7 +1160,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon response->InputLags = std::move(SupportivePartitionTimeLag); PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoResponse"); - ctx.Send(originalPartition, response); + ctx.Send(ev->Sender, response); } void TPartition::WriteInfoResponseHandler( From b069e399e5430a4fedceeb95ce875edb1ed8122c Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Fri, 27 Dec 2024 16:16:49 +0300 Subject: [PATCH 6/9] [*] revert changes --- ydb/core/persqueue/pq_impl.cpp | 35 +++++++++++++--------------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 03aa40cae131..74bd9a24c441 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -819,8 +819,8 @@ void TPersQueue::CreateOriginalPartition(const NKikimrPQ::TPQTabletConfig& confi void TPersQueue::MoveTopTxToCalculating(TDistributedTransaction& tx, const TActorContext& ctx) { -// std::tie(ExecStep, ExecTxId) = TxQueue.front(); -// PQ_LOG_D("New ExecStep " << ExecStep << ", ExecTxId " << ExecTxId); + std::tie(ExecStep, ExecTxId) = TxQueue.front(); + PQ_LOG_D("New ExecStep " << ExecStep << ", ExecTxId " << ExecTxId); switch (tx.Kind) { case NKikimrPQ::TTransaction::KIND_DATA: @@ -2896,10 +2896,7 @@ void TPersQueue::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TA void TPersQueue::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvTabletPipe::TEvClientConnected. " << - "Status " << ev->Get()->Status << - ", TabletId " << ev->Get()->TabletId << - ", ClientId " << ev->Get()->ClientId); + PQ_LOG_D("Handle TEvTabletPipe::TEvClientConnected"); Y_ABORT_UNLESS(ev->Get()->Leader, "Unexpectedly connected to follower of tablet %" PRIu64, ev->Get()->TabletId); @@ -2923,15 +2920,13 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActo void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx) { - PQ_LOG_D("Restart pipe to tablet " << tabletId); - for (const ui64 txId : GetBindedTxs(tabletId)) { + for (const ui64 txId: GetBindedTxs(tabletId)) { auto* tx = GetTransaction(ctx, txId); if (!tx) { continue; } for (auto& message : tx->GetBindedMsgs(tabletId)) { - PQ_LOG_D("Resend message " << message.Type << " to tablet " << tabletId); PipeClientCache->Send(ctx, tabletId, message.Type, message.Data); } } @@ -3122,10 +3117,9 @@ void TPersQueue::SetTxCompleteLagCounter() { ui64 lag = 0; - if (!TxQueue.empty() && MediatorTimeCastEntry) { + if (!TxQueue.empty()) { ui64 firstTxStep = TxQueue.front().first; - //ui64 txId = TxQueue.front().second; - ui64 currentStep = MediatorTimeCastEntry->Get(TabletID()); + ui64 currentStep = TAppData::TimeProvider->Now().MilliSeconds(); if (currentStep > firstTxStep) { lag = currentStep - firstTxStep; @@ -3359,7 +3353,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte if (auto tx = GetTransaction(ctx, event.GetTxId()); tx && tx->PredicatesReceived.contains(event.GetTabletProducer())) { if (tx->State >= NKikimrPQ::TTransaction::EXECUTED) { if (ack) { - PQ_LOG_D("TxId " << event.GetTxId() << ". Send TEvReadSetAck to " << event.GetTabletProducer()); + PQ_LOG_D("send TEvReadSetAck to " << event.GetTabletProducer()); ctx.Send(ev->Sender, ack.release()); return; } @@ -3373,7 +3367,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte TryWriteTxs(ctx); } } else if (ack) { - PQ_LOG_D("TxId " << event.GetTxId() << ". Send TEvReadSetAck to " << event.GetTabletProducer()); + PQ_LOG_D("send TEvReadSetAck to " << event.GetTabletProducer()); // // для неизвестных транзакций подтверждение отправляется сразу // @@ -3695,8 +3689,6 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx) TryExecuteTxs(ctx, tx); TxQueue.emplace_back(step, txId); - PQ_LOG_D("TxId " << tx.TxId << " planned"); - SetTxCompleteLagCounter(); } else { PQ_LOG_W("Transaction already planned for step " << tx.Step << @@ -3886,7 +3878,7 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx, TString body; Y_ABORT_UNLESS(data.SerializeToString(&body)); - PQ_LOG_D("TxId " << tx.TxId << ". Send TEvReadSet to " << tx.PredicateRecipients.size() << " receivers. Wait TEvReadSet from " << tx.PredicatesReceived.size() << " senders."); + PQ_LOG_D("Send TEvTxProcessing::TEvReadSet to " << tx.PredicateRecipients.size() << " receivers. Wait TEvTxProcessing::TEvReadSet from " << tx.PredicatesReceived.size() << " senders."); for (auto& [receiverId, _] : tx.PredicateRecipients) { if (receiverId != TabletID()) { auto event = std::make_unique(tx.Step, @@ -3896,7 +3888,7 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx, TabletID(), body, 0); - //PQ_LOG_D("Send TEvReadSet to tablet " << receiverId); + PQ_LOG_D("Send TEvReadSet to tablet " << receiverId); SendToPipe(receiverId, tx, std::move(event), ctx); } } @@ -3905,9 +3897,9 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx, void TPersQueue::SendEvReadSetAckToSenders(const TActorContext& ctx, TDistributedTransaction& tx) { - PQ_LOG_D("TxId " << tx.TxId << ". Send TEvReadSetAck to " << tx.ReadSetAcks.size() << " receivers."); + PQ_LOG_D("TPersQueue::SendEvReadSetAckToSenders"); for (auto& [target, event] : tx.ReadSetAcks) { - //PQ_LOG_D("Send TEvTxProcessing::TEvReadSetAck " << event->ToString()); + PQ_LOG_D("Send TEvTxProcessing::TEvReadSetAck " << event->ToString()); ctx.Send(target, event.release()); } } @@ -4407,8 +4399,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) { SendEvProposeTransactionResult(ctx, tx); - PQ_LOG_D("TxId " << tx.TxId << " completed" << - ", FrontTxId " << TxQueue.front().second); + PQ_LOG_D("complete TxId " << tx.TxId); switch (tx.Kind) { case NKikimrPQ::TTransaction::KIND_DATA: From bf3833259e3d6181aaced7c4f41e1df83bc1da38 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Fri, 27 Dec 2024 16:17:08 +0300 Subject: [PATCH 7/9] [-] the `SetTxInFlyCounter` function call is missing --- ydb/core/persqueue/pq_impl.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 74bd9a24c441..363b925af25d 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -3093,6 +3093,7 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) { MeteringSink.MayFlush(ctx.Now()); DeleteExpiredTransactions(ctx); SetTxCompleteLagCounter(); + SetTxInFlyCounter(); ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup()); } From a1e5e617503f01a4667db3f4ce2c489f474a2ced Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Fri, 27 Dec 2024 16:25:54 +0300 Subject: [PATCH 8/9] [*] revert changes --- ydb/core/persqueue/pq_impl.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 363b925af25d..69aad3b7d774 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1000,10 +1000,9 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& SetTxInFlyCounter(); if (tx.HasStep()) { -// if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) { -// PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId()); -// } - PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId()); + if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) { + PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId()); + } } if (tx.HasWriteId()) { @@ -2920,7 +2919,7 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActo void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx) { - for (const ui64 txId: GetBindedTxs(tabletId)) { + for (auto& txId: GetBindedTxs(tabletId)) { auto* tx = GetTransaction(ctx, txId); if (!tx) { continue; From 559999fa2c7da51047d2214df2a1330cab91123c Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Fri, 27 Dec 2024 16:53:34 +0300 Subject: [PATCH 9/9] [/] duplication in the code --- ydb/core/persqueue/pq_impl.cpp | 12 ++++++++---- ydb/core/persqueue/pq_impl.h | 1 + 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 69aad3b7d774..2bae93536260 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1015,8 +1015,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& EndInitTransactions(); EndReadConfig(ctx); - SetTxCompleteLagCounter(); - SetTxInFlyCounter(); + SetTxCounters(); } void TPersQueue::EndReadConfig(const TActorContext& ctx) @@ -3091,8 +3090,7 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) { } MeteringSink.MayFlush(ctx.Now()); DeleteExpiredTransactions(ctx); - SetTxCompleteLagCounter(); - SetTxInFlyCounter(); + SetTxCounters(); ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup()); } @@ -3113,6 +3111,12 @@ void TPersQueue::DeleteExpiredTransactions(const TActorContext& ctx) TryWriteTxs(ctx); } +void TPersQueue::SetTxCounters() +{ + SetTxCompleteLagCounter(); + SetTxInFlyCounter(); +} + void TPersQueue::SetTxCompleteLagCounter() { ui64 lag = 0; diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 6394c3cae30f..d3441f76b262 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -443,6 +443,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void DeleteExpiredTransactions(const TActorContext& ctx); void Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx); + void SetTxCounters(); void SetTxCompleteLagCounter(); void SetTxInFlyCounter();