From 7a5d0862160cf17ddc95a3a0afa0626ec1e4653b Mon Sep 17 00:00:00 2001 From: gavingaozhangmin Date: Thu, 14 Oct 2021 18:22:38 +0800 Subject: [PATCH 1/3] export broker bundles data --- .../impl/ModularLoadManagerImpl.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 7668c116a3954..bb359526e084f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -915,6 +915,7 @@ public LocalBrokerData updateLocalBrokerData() { final SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(brokerHostUsage); localData.update(systemResourceUsage, getBundleStats()); updateLoadBalancingMetrics(systemResourceUsage); + updateLoadBalancingBundlesMetrics(getBundleStats()); } catch (Exception e) { log.warn("Error when attempting to update local broker data", e); if (e instanceof ConcurrentModificationException) { @@ -926,6 +927,33 @@ public LocalBrokerData updateLocalBrokerData() { return localData; } + /** + * As any broker, update its bundle metrics. + * + * @param bundlesData + */ + private void updateLoadBalancingBundlesMetrics(Map bundlesData) { + List metrics = Lists.newArrayList(); + for (Map.Entry entry: bundlesData.entrySet()) { + final String bundle = entry.getKey(); + final NamespaceBundleStats stats = entry.getValue(); + Map dimensions = new HashMap<>(); + dimensions.put("broker", ServiceConfigurationUtils.getAppliedAdvertisedAddress(conf, true)); + dimensions.put("bundle", bundle); + dimensions.put("metric", "loadBalancing"); + Metrics m = Metrics.create(dimensions); + m.put("brk_bundle_msg_rate_in", stats.msgRateIn); + m.put("brk_bundle_msg_rate_out", stats.msgRateOut); + m.put("brk_bundle_topics_count", stats.topics); + m.put("brk_bundle_consumer_count", stats.consumerCount); + m.put("brk_bundle_producer_count", stats.producerCount); + m.put("brk_bundle_msg_throughput_in", stats.msgThroughputIn); + m.put("brk_bundle_msg_throughput_out", stats.msgThroughputOut); + metrics.add(m); + } + this.loadBalancingMetrics.set(metrics); + } + /** * As any broker, update System Resource Usage Percentage. * From d18cc779284f5640706f079677ac77103ee2c9d9 Mon Sep 17 00:00:00 2001 From: gavingaozhangmin Date: Fri, 5 Nov 2021 15:53:30 +0800 Subject: [PATCH 2/3] add configuration to control whether expose bundles metrics --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 6 ++++++ .../broker/loadbalance/impl/ModularLoadManagerImpl.java | 4 +++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 832a389e58bb4..4e06c6b073630 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2102,6 +2102,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private boolean splitTopicAndPartitionLabelInPrometheus = false; + @FieldContext( + category = CATEGORY_METRICS, + doc = "Enable expose the broker bundles metrics." + ) + private boolean exposeBunlesMetricsInPrometheus = false; + /**** --- Functions --- ****/ @FieldContext( category = CATEGORY_FUNCTIONS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index bb359526e084f..6f48ebcb85e79 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -915,7 +915,9 @@ public LocalBrokerData updateLocalBrokerData() { final SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(brokerHostUsage); localData.update(systemResourceUsage, getBundleStats()); updateLoadBalancingMetrics(systemResourceUsage); - updateLoadBalancingBundlesMetrics(getBundleStats()); + if (conf.isExposeBunlesMetricsInPrometheus()) { + updateLoadBalancingBundlesMetrics(getBundleStats()); + } } catch (Exception e) { log.warn("Error when attempting to update local broker data", e); if (e instanceof ConcurrentModificationException) { From 5276c4510e9e419b4c629cfd9c0574cd45416b6b Mon Sep 17 00:00:00 2001 From: gavingaozhangmin Date: Fri, 5 Nov 2021 16:01:12 +0800 Subject: [PATCH 3/3] fix wrong method --- .../pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 6f48ebcb85e79..90eb962cd32bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -940,7 +940,7 @@ private void updateLoadBalancingBundlesMetrics(Map final String bundle = entry.getKey(); final NamespaceBundleStats stats = entry.getValue(); Map dimensions = new HashMap<>(); - dimensions.put("broker", ServiceConfigurationUtils.getAppliedAdvertisedAddress(conf, true)); + dimensions.put("broker", pulsar.getAdvertisedAddress()); dimensions.put("bundle", bundle); dimensions.put("metric", "loadBalancing"); Metrics m = Metrics.create(dimensions);