diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 692dd1504da69..b832c9568d718 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2103,6 +2103,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private boolean splitTopicAndPartitionLabelInPrometheus = false; + @FieldContext( + category = CATEGORY_METRICS, + doc = "Enable expose the broker bundles metrics." + ) + private boolean exposeBunlesMetricsInPrometheus = false; + /**** --- Functions --- ****/ @FieldContext( category = CATEGORY_FUNCTIONS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 7668c116a3954..90eb962cd32bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -915,6 +915,9 @@ public LocalBrokerData updateLocalBrokerData() { final SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(brokerHostUsage); localData.update(systemResourceUsage, getBundleStats()); updateLoadBalancingMetrics(systemResourceUsage); + if (conf.isExposeBunlesMetricsInPrometheus()) { + updateLoadBalancingBundlesMetrics(getBundleStats()); + } } catch (Exception e) { log.warn("Error when attempting to update local broker data", e); if (e instanceof ConcurrentModificationException) { @@ -926,6 +929,33 @@ public LocalBrokerData updateLocalBrokerData() { return localData; } + /** + * As any broker, update its bundle metrics. + * + * @param bundlesData + */ + private void updateLoadBalancingBundlesMetrics(Map bundlesData) { + List metrics = Lists.newArrayList(); + for (Map.Entry entry: bundlesData.entrySet()) { + final String bundle = entry.getKey(); + final NamespaceBundleStats stats = entry.getValue(); + Map dimensions = new HashMap<>(); + dimensions.put("broker", pulsar.getAdvertisedAddress()); + dimensions.put("bundle", bundle); + dimensions.put("metric", "loadBalancing"); + Metrics m = Metrics.create(dimensions); + m.put("brk_bundle_msg_rate_in", stats.msgRateIn); + m.put("brk_bundle_msg_rate_out", stats.msgRateOut); + m.put("brk_bundle_topics_count", stats.topics); + m.put("brk_bundle_consumer_count", stats.consumerCount); + m.put("brk_bundle_producer_count", stats.producerCount); + m.put("brk_bundle_msg_throughput_in", stats.msgThroughputIn); + m.put("brk_bundle_msg_throughput_out", stats.msgThroughputOut); + metrics.add(m); + } + this.loadBalancingMetrics.set(metrics); + } + /** * As any broker, update System Resource Usage Percentage. *