Skip to content

Commit

Permalink
TxInFly and TxCompleteLag metrics for PQ (ydb-platform#13072)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov committed Dec 31, 2024
1 parent 04f425f commit 55d7c7b
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
43 changes: 39 additions & 4 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
}

Txs.emplace(tx.GetTxId(), tx);
SetTxInFlyCounter();

if (tx.HasStep()) {
if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) {
Expand All @@ -994,6 +995,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&

EndInitTransactions();
EndReadConfig(ctx);

SetTxCounters();
}

void TPersQueue::EndReadConfig(const TActorContext& ctx)
Expand Down Expand Up @@ -3081,6 +3084,7 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) {
}
MeteringSink.MayFlush(ctx.Now());
DeleteExpiredTransactions(ctx);
SetTxCounters();
ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup());
}

Expand All @@ -3101,6 +3105,33 @@ void TPersQueue::DeleteExpiredTransactions(const TActorContext& ctx)
TryWriteTxs(ctx);
}

void TPersQueue::SetTxCounters()
{
SetTxCompleteLagCounter();
SetTxInFlyCounter();
}

void TPersQueue::SetTxCompleteLagCounter()
{
ui64 lag = 0;

if (!TxQueue.empty()) {
ui64 firstTxStep = TxQueue.front().first;
ui64 currentStep = TAppData::TimeProvider->Now().MilliSeconds();

if (currentStep > firstTxStep) {
lag = currentStep - firstTxStep;
}
}

Counters->Simple()[COUNTER_PQ_TABLET_TX_COMPLETE_LAG] = lag;
}

void TPersQueue::SetTxInFlyCounter()
{
Counters->Simple()[COUNTER_PQ_TABLET_TX_IN_FLY] = Txs.size();
}

void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPersQueue::TEvCancelTransactionProposal");
Expand Down Expand Up @@ -3575,6 +3606,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)

const NKikimrPQ::TEvProposeTransaction& event = front->GetRecord();
TDistributedTransaction& tx = Txs[event.GetTxId()];
SetTxInFlyCounter();

switch (tx.State) {
case NKikimrPQ::TTransaction::UNKNOWN:
Expand Down Expand Up @@ -3656,6 +3688,7 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx)
TryExecuteTxs(ctx, tx);

TxQueue.emplace_back(step, txId);
SetTxCompleteLagCounter();
} else {
LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE,
"Tablet " << TabletID() <<
Expand Down Expand Up @@ -4392,6 +4425,11 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
Y_ABORT_UNLESS(tx.TxId == TxsOrder[tx.State].front(),
"PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64,
TabletID(), tx.TxId, TxsOrder[tx.State].front());
Y_ABORT_UNLESS(tx.TxId == TxQueue.front().second,
"PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64,
TabletID(), tx.TxId, TxQueue.front().second);
TxQueue.pop_front();
SetTxCompleteLagCounter();

SendEvReadSetAckToSenders(ctx, tx);

Expand All @@ -4411,13 +4449,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,

case NKikimrPQ::TTransaction::DELETING:
// The PQ tablet has persisted its state. Now she can delete the transaction and take the next one.
if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) {
TxQueue.pop_front();
}

DeleteWriteId(tx.WriteId);
PQ_LOG_D("delete TxId " << tx.TxId);
Txs.erase(tx.TxId);
SetTxInFlyCounter();

// If this was the last transaction, then you need to send responses to messages about changes
// in the status of the PQ tablet (if they came)
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void DeleteExpiredTransactions(const TActorContext& ctx);
void Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx);

void SetTxCounters();
void SetTxCompleteLagCounter();
void SetTxInFlyCounter();

bool CanProcessProposeTransactionQueue() const;
bool CanProcessPlanStepQueue() const;
bool CanProcessWriteTxs() const;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/counters_pq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ enum ESimpleCounters {
COUNTER_PQ_TABLET_OPENED_PIPES = 5 [(CounterOpts) = {Name: "OpenedPipes"}];
COUNTER_PQ_TABLET_INFLIGHT = 6 [(CounterOpts) = {Name: "RequestsInflight"}];

COUNTER_PQ_TABLET_TX_COMPLETE_LAG = 7 [(CounterOpts) = {Name: "TxCompleteLag"}];
COUNTER_PQ_TABLET_TX_IN_FLY = 8 [(CounterOpts) = {Name: "TxInFly"}];
}

enum EPercentileCounters {
Expand Down

0 comments on commit 55d7c7b

Please sign in to comment.