Skip to content

Commit

Permalink
Expose broker bundles metrics to prometheus (apache#12366)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaozhangmin authored Nov 10, 2021
1 parent aa59f75 commit 0f9bcbc
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean splitTopicAndPartitionLabelInPrometheus = false;

@FieldContext(
category = CATEGORY_METRICS,
doc = "Enable expose the broker bundles metrics."
)
private boolean exposeBunlesMetricsInPrometheus = false;

/**** --- Functions --- ****/
@FieldContext(
category = CATEGORY_FUNCTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,9 @@ public LocalBrokerData updateLocalBrokerData() {
final SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(brokerHostUsage);
localData.update(systemResourceUsage, getBundleStats());
updateLoadBalancingMetrics(systemResourceUsage);
if (conf.isExposeBunlesMetricsInPrometheus()) {
updateLoadBalancingBundlesMetrics(getBundleStats());
}
} catch (Exception e) {
log.warn("Error when attempting to update local broker data", e);
if (e instanceof ConcurrentModificationException) {
Expand All @@ -926,6 +929,33 @@ public LocalBrokerData updateLocalBrokerData() {
return localData;
}

/**
* As any broker, update its bundle metrics.
*
* @param bundlesData
*/
private void updateLoadBalancingBundlesMetrics(Map<String, NamespaceBundleStats> bundlesData) {
List<Metrics> metrics = Lists.newArrayList();
for (Map.Entry<String, NamespaceBundleStats> entry: bundlesData.entrySet()) {
final String bundle = entry.getKey();
final NamespaceBundleStats stats = entry.getValue();
Map<String, String> dimensions = new HashMap<>();
dimensions.put("broker", pulsar.getAdvertisedAddress());
dimensions.put("bundle", bundle);
dimensions.put("metric", "loadBalancing");
Metrics m = Metrics.create(dimensions);
m.put("brk_bundle_msg_rate_in", stats.msgRateIn);
m.put("brk_bundle_msg_rate_out", stats.msgRateOut);
m.put("brk_bundle_topics_count", stats.topics);
m.put("brk_bundle_consumer_count", stats.consumerCount);
m.put("brk_bundle_producer_count", stats.producerCount);
m.put("brk_bundle_msg_throughput_in", stats.msgThroughputIn);
m.put("brk_bundle_msg_throughput_out", stats.msgThroughputOut);
metrics.add(m);
}
this.loadBalancingMetrics.set(metrics);
}

/**
* As any broker, update System Resource Usage Percentage.
*
Expand Down

0 comments on commit 0f9bcbc

Please sign in to comment.