Skip to content

Commit

Permalink
Merge a1e5e61 into fa346a5
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored Dec 27, 2024
2 parents fa346a5 + a1e5e61 commit aeea1d1
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
39 changes: 35 additions & 4 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
}

Txs.emplace(tx.GetTxId(), tx);
SetTxInFlyCounter();

if (tx.HasStep()) {
if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) {
Expand All @@ -1013,6 +1014,9 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&

EndInitTransactions();
EndReadConfig(ctx);

SetTxCompleteLagCounter();
SetTxInFlyCounter();
}

void TPersQueue::EndReadConfig(const TActorContext& ctx)
Expand Down Expand Up @@ -3087,6 +3091,8 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) {
}
MeteringSink.MayFlush(ctx.Now());
DeleteExpiredTransactions(ctx);
SetTxCompleteLagCounter();
SetTxInFlyCounter();
ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup());
}

Expand All @@ -3107,6 +3113,27 @@ void TPersQueue::DeleteExpiredTransactions(const TActorContext& ctx)
TryWriteTxs(ctx);
}

void TPersQueue::SetTxCompleteLagCounter()
{
ui64 lag = 0;

if (!TxQueue.empty()) {
ui64 firstTxStep = TxQueue.front().first;
ui64 currentStep = TAppData::TimeProvider->Now().MilliSeconds();

if (currentStep > firstTxStep) {
lag = currentStep - firstTxStep;
}
}

Counters->Simple()[COUNTER_PQ_TABLET_TX_COMPLETE_LAG] = lag;
}

void TPersQueue::SetTxInFlyCounter()
{
Counters->Simple()[COUNTER_PQ_TABLET_TX_IN_FLY] = Txs.size();
}

void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPersQueue::TEvCancelTransactionProposal");
Expand Down Expand Up @@ -3580,6 +3607,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)

const NKikimrPQ::TEvProposeTransaction& event = front->GetRecord();
TDistributedTransaction& tx = Txs[event.GetTxId()];
SetTxInFlyCounter();

switch (tx.State) {
case NKikimrPQ::TTransaction::UNKNOWN:
Expand Down Expand Up @@ -3661,6 +3689,7 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx)
TryExecuteTxs(ctx, tx);

TxQueue.emplace_back(step, txId);
SetTxCompleteLagCounter();
} else {
PQ_LOG_W("Transaction already planned for step " << tx.Step <<
", Step: " << step <<
Expand Down Expand Up @@ -4400,6 +4429,11 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
Y_ABORT_UNLESS(tx.TxId == TxsOrder[tx.State].front(),
"PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64,
TabletID(), tx.TxId, TxsOrder[tx.State].front());
Y_ABORT_UNLESS(tx.TxId == TxQueue.front().second,
"PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64,
TabletID(), tx.TxId, TxQueue.front().second);
TxQueue.pop_front();
SetTxCompleteLagCounter();

SendEvReadSetAckToSenders(ctx, tx);

Expand All @@ -4419,13 +4453,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,

case NKikimrPQ::TTransaction::DELETING:
// The PQ tablet has persisted its state. Now she can delete the transaction and take the next one.
if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) {
TxQueue.pop_front();
}

DeleteWriteId(tx.WriteId);
PQ_LOG_D("delete TxId " << tx.TxId);
Txs.erase(tx.TxId);
SetTxInFlyCounter();

// If this was the last transaction, then you need to send responses to messages about changes
// in the status of the PQ tablet (if they came)
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void DeleteExpiredTransactions(const TActorContext& ctx);
void Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx);

void SetTxCompleteLagCounter();
void SetTxInFlyCounter();

bool CanProcessProposeTransactionQueue() const;
bool CanProcessPlanStepQueue() const;
bool CanProcessWriteTxs() const;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/counters_pq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ enum ESimpleCounters {
COUNTER_PQ_TABLET_OPENED_PIPES = 5 [(CounterOpts) = {Name: "OpenedPipes"}];
COUNTER_PQ_TABLET_INFLIGHT = 6 [(CounterOpts) = {Name: "RequestsInflight"}];

COUNTER_PQ_TABLET_TX_COMPLETE_LAG = 7 [(CounterOpts) = {Name: "TxCompleteLag"}];
COUNTER_PQ_TABLET_TX_IN_FLY = 8 [(CounterOpts) = {Name: "TxInFly"}];
}

enum EPercentileCounters {
Expand Down

0 comments on commit aeea1d1

Please sign in to comment.