Skip to content

Commit

Permalink
[monitor][txn] Add metrics for transaction (#15140)
Browse files Browse the repository at this point in the history
  • Loading branch information
tjiuming authored Aug 12, 2022
1 parent ce419cb commit b7b2e37
Show file tree
Hide file tree
Showing 18 changed files with 694 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,10 @@ public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog
stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
stats.publishRateLimitedTimes = publishRateLimitedTimes;
TransactionBuffer txnBuffer = getTransactionBuffer();
stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount();
stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
stats.committedTxnCount = txnBuffer.getCommittedTxnCount();

subscriptions.forEach((name, subscription) -> {
SubscriptionStatsImpl subStats =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public class AggregatedNamespaceStats {
public long msgBacklog;
public long msgDelayed;

public long ongoingTxnCount;
public long abortedTxnCount;
public long committedTxnCount;

long backlogQuotaLimit;
long backlogQuotaLimitTime;

Expand Down Expand Up @@ -79,6 +83,10 @@ void updateStats(TopicStats stats) {
msgOutCounter += stats.msgOutCounter;
delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;

this.ongoingTxnCount += stats.ongoingTxnCount;
this.abortedTxnCount += stats.abortedTxnCount;
this.committedTxnCount += stats.committedTxnCount;

managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize;
managedLedgerStats.storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
managedLedgerStats.backlogSize += stats.managedLedgerStats.backlogSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.averageMsgSize = tStatus.averageMsgSize;
stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
stats.delayedTrackerMemoryUsage = tStatus.delayedMessageIndexSizeInBytes;
stats.abortedTxnCount = tStatus.abortedTxnCount;
stats.ongoingTxnCount = tStatus.ongoingTxnCount;
stats.committedTxnCount = tStatus.committedTxnCount;

stats.producersCount = 0;
topic.getProducers().values().forEach(producer -> {
Expand Down Expand Up @@ -325,6 +328,9 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl
metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
metric(stream, cluster, namespace, "pulsar_txn_tb_active_total", stats.ongoingTxnCount);
metric(stream, cluster, namespace, "pulsar_txn_tb_aborted_total", stats.abortedTxnCount);
metric(stream, cluster, namespace, "pulsar_txn_tb_committed_total", stats.committedTxnCount);
metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);

metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ class TopicStats {
long bytesOutCounter;
double averageMsgSize;

long ongoingTxnCount;
long abortedTxnCount;
long committedTxnCount;

public long msgBacklog;
long publishRateLimitedTimes;

Expand Down Expand Up @@ -82,6 +86,10 @@ public void reset() {
bytesOutCounter = 0;
msgOutCounter = 0;

ongoingTxnCount = 0;
abortedTxnCount = 0;
committedTxnCount = 0;

managedLedgerStats.reset();
msgBacklog = 0;
publishRateLimitedTimes = 0L;
Expand Down Expand Up @@ -129,6 +137,13 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize,
splitTopicAndPartitionIndexLabel);

metric(stream, cluster, namespace, topic, "pulsar_txn_tb_active_total", stats.ongoingTxnCount,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_txn_tb_aborted_total", stats.abortedTxnCount,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_txn_tb_committed_total", stats.committedTxnCount,
splitTopicAndPartitionIndexLabel);

metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,12 @@ public interface TransactionBuffer {
* @return a future which has completely if isTxn = false. Or a future return by takeSnapshot.
*/
CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn);



long getOngoingTxnCount();

long getAbortedTxnCount();

long getCommittedTxnCount();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.buffer;

import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientStatsImpl;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;

public interface TransactionBufferClientStats {

void recordAbortFailed(String topic);

void recordCommitFailed(String topic);

void recordAbortLatency(String topic, long nanos);

void recordCommitLatency(String topic, long nanos);

void close();


static TransactionBufferClientStats create(boolean exposeTopicMetrics, TransactionBufferHandler handler,
boolean enableTxnCoordinator) {
return enableTxnCoordinator
? TransactionBufferClientStatsImpl.getInstance(exposeTopicMetrics, handler) : NOOP;
}


TransactionBufferClientStats NOOP = new TransactionBufferClientStats() {
@Override
public void recordAbortFailed(String topic) {

}

@Override
public void recordCommitFailed(String topic) {

}

@Override
public void recordAbortLatency(String topic, long nanos) {

}

@Override
public void recordCommitLatency(String topic, long nanos) {

}

@Override
public void close() {

}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -388,4 +388,28 @@ public TransactionBufferStats getStats(boolean lowWaterMarks) {
public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
return CompletableFuture.completedFuture(null);
}

@Override
public long getOngoingTxnCount() {
return this.buffers.values().stream()
.filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.OPEN)
|| txnBuffer.status.equals(TxnStatus.COMMITTING)
|| txnBuffer.status.equals(TxnStatus.ABORTING)
)
.count();
}

@Override
public long getAbortedTxnCount() {
return this.buffers.values().stream()
.filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.ABORTED))
.count();
}

@Override
public long getCommittedTxnCount() {
return this.buffers.values().stream()
.filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.COMMITTED))
.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -89,6 +90,10 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
// when add abort or change max read position, the count will +1. Take snapshot will set 0 into it.
private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new AtomicLong();

private final LongAdder txnCommittedCounter = new LongAdder();

private final LongAdder txnAbortedCounter = new LongAdder();

private final Timer timer;

private final int takeSnapshotIntervalNumber;
Expand Down Expand Up @@ -258,6 +263,20 @@ public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxnEnabled)
}
}

@Override
public long getOngoingTxnCount() {
return this.ongoingTxns.size();
}

@Override
public long getAbortedTxnCount() {
return this.txnAbortedCounter.sum();
}

@Override
public long getCommittedTxnCount() {
return this.txnCommittedCounter.sum();
}

@Override
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
Expand Down Expand Up @@ -322,6 +341,7 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
clearAbortedTransactions();
takeSnapshotByChangeTimes();
}
txnCommittedCounter.increment();
completableFuture.complete(null);
}

Expand Down Expand Up @@ -368,6 +388,7 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
clearAbortedTransactions();
takeSnapshotByChangeTimes();
}
txnAbortedCounter.increment();
completableFuture.complete(null);
handleLowWaterMark(txnID, lowWaterMark);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
Expand All @@ -35,47 +37,88 @@
public class TransactionBufferClientImpl implements TransactionBufferClient {

private final TransactionBufferHandler tbHandler;
private final TransactionBufferClientStats stats;

private TransactionBufferClientImpl(TransactionBufferHandler tbHandler) {
private TransactionBufferClientImpl(TransactionBufferHandler tbHandler, boolean exposeTopicLevelMetrics,
boolean enableTxnCoordinator) {
this.tbHandler = tbHandler;
this.stats = TransactionBufferClientStats.create(exposeTopicLevelMetrics, tbHandler, enableTxnCoordinator);
}

public static TransactionBufferClient create(PulsarService pulsarService, HashedWheelTimer timer,
int maxConcurrentRequests, long operationTimeoutInMills) throws PulsarServerException {
TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarService, timer,
maxConcurrentRequests, operationTimeoutInMills);
return new TransactionBufferClientImpl(handler);

ServiceConfiguration config = pulsarService.getConfig();
boolean exposeTopicLevelMetrics = config.isExposeTopicLevelMetricsInPrometheus();
boolean enableTxnCoordinator = config.isTransactionCoordinatorEnabled();
return new TransactionBufferClientImpl(handler, exposeTopicLevelMetrics, enableTxnCoordinator);
}

@Override
public CompletableFuture<TxnID> commitTxnOnTopic(String topic, long txnIdMostBits,
long txnIdLeastBits, long lowWaterMark) {
return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.COMMIT, lowWaterMark);
long start = System.nanoTime();
return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.COMMIT, lowWaterMark)
.whenComplete((__, t) -> {
if (null != t) {
this.stats.recordCommitFailed(topic);
} else {
this.stats.recordCommitLatency(topic, System.nanoTime() - start);
}
});
}

@Override
public CompletableFuture<TxnID> abortTxnOnTopic(String topic, long txnIdMostBits,
long txnIdLeastBits, long lowWaterMark) {
return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.ABORT, lowWaterMark);
long start = System.nanoTime();
return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.ABORT, lowWaterMark)
.whenComplete((__, t) -> {
if (null != t) {
this.stats.recordAbortFailed(topic);
} else {
this.stats.recordAbortLatency(topic, System.nanoTime() - start);
}
});
}

@Override
public CompletableFuture<TxnID> commitTxnOnSubscription(String topic, String subscription, long txnIdMostBits,
long txnIdLeastBits, long lowWaterMark) {
long start = System.nanoTime();
return tbHandler.endTxnOnSubscription(topic, subscription, txnIdMostBits, txnIdLeastBits,
TxnAction.COMMIT, lowWaterMark);
TxnAction.COMMIT, lowWaterMark)
.whenComplete((__, t) -> {
if (null != t) {
this.stats.recordCommitFailed(topic);
} else {
this.stats.recordCommitLatency(topic, System.nanoTime() - start);
}
});
}

@Override
public CompletableFuture<TxnID> abortTxnOnSubscription(String topic, String subscription,
long txnIdMostBits, long txnIdLeastBits, long lowWaterMark) {
long start = System.nanoTime();
return tbHandler.endTxnOnSubscription(topic, subscription, txnIdMostBits, txnIdLeastBits,
TxnAction.ABORT, lowWaterMark);
TxnAction.ABORT, lowWaterMark)
.whenComplete((__, t) -> {
if (null != t) {
this.stats.recordAbortFailed(topic);

} else {
this.stats.recordAbortLatency(topic, System.nanoTime() - start);
}
});
}

@Override
public void close() {
tbHandler.close();
this.stats.close();
}

@Override
Expand Down
Loading

0 comments on commit b7b2e37

Please sign in to comment.