diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index cfc914324e40..1d9ca8e3e1b1 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -978,6 +978,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& } Txs.emplace(tx.GetTxId(), tx); + SetTxInFlyCounter(); if (tx.HasStep()) { if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) { @@ -994,6 +995,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& EndInitTransactions(); EndReadConfig(ctx); + + SetTxCounters(); } void TPersQueue::EndReadConfig(const TActorContext& ctx) @@ -3081,6 +3084,7 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) { } MeteringSink.MayFlush(ctx.Now()); DeleteExpiredTransactions(ctx); + SetTxCounters(); ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup()); } @@ -3101,6 +3105,33 @@ void TPersQueue::DeleteExpiredTransactions(const TActorContext& ctx) TryWriteTxs(ctx); } +void TPersQueue::SetTxCounters() +{ + SetTxCompleteLagCounter(); + SetTxInFlyCounter(); +} + +void TPersQueue::SetTxCompleteLagCounter() +{ + ui64 lag = 0; + + if (!TxQueue.empty()) { + ui64 firstTxStep = TxQueue.front().first; + ui64 currentStep = TAppData::TimeProvider->Now().MilliSeconds(); + + if (currentStep > firstTxStep) { + lag = currentStep - firstTxStep; + } + } + + Counters->Simple()[COUNTER_PQ_TABLET_TX_COMPLETE_LAG] = lag; +} + +void TPersQueue::SetTxInFlyCounter() +{ + Counters->Simple()[COUNTER_PQ_TABLET_TX_IN_FLY] = Txs.size(); +} + void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvPersQueue::TEvCancelTransactionProposal"); @@ -3575,6 +3606,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) const NKikimrPQ::TEvProposeTransaction& event = front->GetRecord(); TDistributedTransaction& tx = Txs[event.GetTxId()]; + SetTxInFlyCounter(); switch (tx.State) { case NKikimrPQ::TTransaction::UNKNOWN: @@ -3656,6 +3688,7 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx) TryExecuteTxs(ctx, tx); TxQueue.emplace_back(step, txId); + SetTxCompleteLagCounter(); } else { LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << @@ -4392,6 +4425,11 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, Y_ABORT_UNLESS(tx.TxId == TxsOrder[tx.State].front(), "PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64, TabletID(), tx.TxId, TxsOrder[tx.State].front()); + Y_ABORT_UNLESS(tx.TxId == TxQueue.front().second, + "PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64, + TabletID(), tx.TxId, TxQueue.front().second); + TxQueue.pop_front(); + SetTxCompleteLagCounter(); SendEvReadSetAckToSenders(ctx, tx); @@ -4411,13 +4449,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::DELETING: // The PQ tablet has persisted its state. Now she can delete the transaction and take the next one. - if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) { - TxQueue.pop_front(); - } - DeleteWriteId(tx.WriteId); PQ_LOG_D("delete TxId " << tx.TxId); Txs.erase(tx.TxId); + SetTxInFlyCounter(); // If this was the last transaction, then you need to send responses to messages about changes // in the status of the PQ tablet (if they came) diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 2fdc0d1bf16e..5dee69618f72 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -441,6 +441,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void DeleteExpiredTransactions(const TActorContext& ctx); void Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx); + void SetTxCounters(); + void SetTxCompleteLagCounter(); + void SetTxInFlyCounter(); + bool CanProcessProposeTransactionQueue() const; bool CanProcessPlanStepQueue() const; bool CanProcessWriteTxs() const; diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto index 8c975d9a4b7b..6aab74eb959f 100644 --- a/ydb/core/protos/counters_pq.proto +++ b/ydb/core/protos/counters_pq.proto @@ -68,6 +68,8 @@ enum ESimpleCounters { COUNTER_PQ_TABLET_OPENED_PIPES = 5 [(CounterOpts) = {Name: "OpenedPipes"}]; COUNTER_PQ_TABLET_INFLIGHT = 6 [(CounterOpts) = {Name: "RequestsInflight"}]; + COUNTER_PQ_TABLET_TX_COMPLETE_LAG = 7 [(CounterOpts) = {Name: "TxCompleteLag"}]; + COUNTER_PQ_TABLET_TX_IN_FLY = 8 [(CounterOpts) = {Name: "TxInFly"}]; } enum EPercentileCounters {