From ba14ad01015df1f7d7cb1cb92cda618579b857a4 Mon Sep 17 00:00:00 2001 From: Daniel Roberts Date: Thu, 24 Oct 2024 13:38:10 +0000 Subject: [PATCH 1/2] Adds the queue to the common tags for compactor (#5011) Backport the resource group metrics from 4.x Updates MetricsInfoImpl with the concept of `resource.group` tags for scan servers and compactors. --- .../accumulo/core/metrics/MetricsInfo.java | 16 +++++++- .../server/metrics/MetricsInfoImpl.java | 13 +++---- .../coordinator/CompactionCoordinator.java | 2 +- .../apache/accumulo/compactor/Compactor.java | 2 +- .../accumulo/compactor/CompactorTest.java | 39 ++++++++++++++----- .../accumulo/gc/SimpleGarbageCollector.java | 2 +- .../org/apache/accumulo/manager/Manager.java | 2 +- .../org/apache/accumulo/monitor/Monitor.java | 2 +- .../apache/accumulo/tserver/ScanServer.java | 5 +-- .../apache/accumulo/tserver/TabletServer.java | 2 +- .../test/functional/ZombieTServer.java | 2 +- 11 files changed, 58 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java index 4277e118875..f0962a5c1f8 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java @@ -52,6 +52,18 @@ static Tag processTag(final String processName) { return Tag.of("process.name", processName); } + /** + * Convenience method to create tag name / value pair for the resource group name + * + * @param resourceGroupName the resource group name + */ + static Tag resourceGroupTag(final String resourceGroupName) { + if (resourceGroupName == null || resourceGroupName.isEmpty()) { + return Tag.of("resource.group", "NOT_PROVIDED"); + } + return Tag.of("resource.group", resourceGroupName); + } + /** * Convenience method to create tag name / value pairs for the host and port from address * host:port pair. @@ -76,8 +88,10 @@ static List addressTags(final HostAndPort hostAndPort) { * * @param applicationName the application (process) name. * @param hostAndPort the host:port pair + * @oaram resourceGroup the resource group name */ - void addServiceTags(final String applicationName, final HostAndPort hostAndPort); + void addServiceTags(final String applicationName, final HostAndPort hostAndPort, + final String resourceGroup); /** * Add the list of tag name / value pair to the common tags that will be emitted with all metrics. diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java index 9053567dd5e..19adacc554e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java @@ -111,15 +111,14 @@ public boolean isMetricsEnabled() { * Common tags for all services. */ @Override - public void addServiceTags(final String applicationName, final HostAndPort hostAndPort) { + public void addServiceTags(final String applicationName, final HostAndPort hostAndPort, + final String resourceGroupName) { List tags = new ArrayList<>(); - if (applicationName != null && !applicationName.isEmpty()) { - tags.add(MetricsInfo.processTag(applicationName)); - } - if (hostAndPort != null) { - tags.addAll(MetricsInfo.addressTags(hostAndPort)); - } + tags.add(MetricsInfo.processTag(applicationName)); + tags.addAll(MetricsInfo.addressTags(hostAndPort)); + tags.add(MetricsInfo.resourceGroupTag(resourceGroupName)); + addCommonTags(tags); } diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 37039adf681..dacd145a31b 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -275,7 +275,7 @@ public void run() { } MetricsInfo metricsInfo = getContext().getMetricsInfo(); - metricsInfo.addServiceTags(getApplicationName(), clientAddress); + metricsInfo.addServiceTags(getApplicationName(), clientAddress, ""); metricsInfo.init(); // On a re-start of the coordinator it's possible that external compactions are in-progress. diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 033bb8c79d3..e52b9bdb91c 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -686,7 +686,7 @@ public void run() { } MetricsInfo metricsInfo = getContext().getMetricsInfo(); - metricsInfo.addServiceTags(getApplicationName(), clientAddress); + metricsInfo.addServiceTags(getApplicationName(), clientAddress, queueName); metricsInfo.addMetricsProducers(this); metricsInfo.init(); diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java index ece6b108d65..beeb8c5953a 100644 --- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java +++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java @@ -193,8 +193,8 @@ public class SuccessfulCompactor extends Compactor { private TCompactionStatusUpdate latestState = null; SuccessfulCompactor(Supplier uuid, ServerAddress address, TExternalCompactionJob job, - ServerContext context, ExternalCompactionId eci) { - super(new CompactorServerOpts(), new String[] {"-q", "testQ"}); + ServerContext context, ExternalCompactionId eci, CompactorServerOpts compactorServerOpts) { + super(compactorServerOpts, new String[] {"-q", "testQ"}); this.uuid = uuid; this.address = address; this.job = job; @@ -277,8 +277,8 @@ public boolean isFailedCalled() { public class FailedCompactor extends SuccessfulCompactor { FailedCompactor(Supplier uuid, ServerAddress address, TExternalCompactionJob job, - ServerContext context, ExternalCompactionId eci) { - super(uuid, address, job, context, eci); + ServerContext context, ExternalCompactionId eci, CompactorServerOpts compactorServerOpts) { + super(uuid, address, job, context, eci, compactorServerOpts); } @Override @@ -292,8 +292,8 @@ protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob job, public class InterruptedCompactor extends SuccessfulCompactor { InterruptedCompactor(Supplier uuid, ServerAddress address, TExternalCompactionJob job, - ServerContext context, ExternalCompactionId eci) { - super(uuid, address, job, context, eci); + ServerContext context, ExternalCompactionId eci, CompactorServerOpts compactorServerOpts) { + super(uuid, address, job, context, eci, compactorServerOpts); } @Override @@ -345,6 +345,10 @@ public void testCompactionSucceeds() throws Exception { MetricsInfo metricsInfo = PowerMock.createNiceMock(MetricsInfo.class); expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes(); + Compactor.CompactorServerOpts compactorServerOpts = + PowerMock.createNiceMock(Compactor.CompactorServerOpts.class); + expect(compactorServerOpts.getQueueName()).andReturn("testQ"); + ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class); ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class); expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes(); @@ -355,7 +359,8 @@ public void testCompactionSucceeds() throws Exception { PowerMock.replayAll(); - SuccessfulCompactor c = new SuccessfulCompactor(supplier, client, job, context, eci); + SuccessfulCompactor c = + new SuccessfulCompactor(supplier, client, job, context, eci, compactorServerOpts); c.run(); PowerMock.verifyAll(); @@ -396,6 +401,10 @@ public void testCompactionFails() throws Exception { MetricsInfo metricsInfo = PowerMock.createNiceMock(MetricsInfo.class); expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes(); + Compactor.CompactorServerOpts compactorServerOpts = + PowerMock.createNiceMock(Compactor.CompactorServerOpts.class); + expect(compactorServerOpts.getQueueName()).andReturn("testQ"); + ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class); ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class); expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes(); @@ -406,7 +415,8 @@ public void testCompactionFails() throws Exception { PowerMock.replayAll(); - FailedCompactor c = new FailedCompactor(supplier, client, job, context, eci); + FailedCompactor c = + new FailedCompactor(supplier, client, job, context, eci, compactorServerOpts); c.run(); PowerMock.verifyAll(); @@ -448,6 +458,10 @@ public void testCompactionInterrupted() throws Exception { MetricsInfo metricsInfo = PowerMock.createNiceMock(MetricsInfo.class); expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes(); + Compactor.CompactorServerOpts compactorServerOpts = + PowerMock.createNiceMock(Compactor.CompactorServerOpts.class); + expect(compactorServerOpts.getQueueName()).andReturn("testQ"); + ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class); ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class); expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes(); @@ -458,7 +472,8 @@ public void testCompactionInterrupted() throws Exception { PowerMock.replayAll(); - InterruptedCompactor c = new InterruptedCompactor(supplier, client, job, context, eci); + InterruptedCompactor c = + new InterruptedCompactor(supplier, client, job, context, eci, compactorServerOpts); c.run(); PowerMock.verifyAll(); @@ -481,9 +496,13 @@ public void testCompactionWaitProperty() { ServerContext context = PowerMock.createNiceMock(ServerContext.class); expect(context.getConfiguration()).andReturn(conf).anyTimes(); + Compactor.CompactorServerOpts compactorServerOpts = + PowerMock.createNiceMock(Compactor.CompactorServerOpts.class); + expect(compactorServerOpts.getQueueName()).andReturn(null); + PowerMock.replayAll(); - try (var c = new SuccessfulCompactor(null, null, null, context, null)) { + try (var c = new SuccessfulCompactor(null, null, null, context, null, compactorServerOpts)) { Long maxWait = c.getWaitTimeBetweenCompactionChecks(1); // compaction jitter means maxWait is between 0.9 and 1.1 of the desired value. assertTrue(maxWait >= 720L); diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 153cf698a22..45263d17449 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -167,7 +167,7 @@ public void run() { } MetricsInfo metricsInfo = getContext().getMetricsInfo(); - metricsInfo.addServiceTags(getApplicationName(), address); + metricsInfo.addServiceTags(getApplicationName(), address, ""); metricsInfo.addMetricsProducers(new GcMetrics(this)); metricsInfo.init(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index d02345905b7..dab0f107638 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1245,7 +1245,7 @@ public void run() { } MetricsInfo metricsInfo = getContext().getMetricsInfo(); - metricsInfo.addServiceTags(getApplicationName(), sa.getAddress()); + metricsInfo.addServiceTags(getApplicationName(), sa.getAddress(), ""); var producers = ManagerMetrics.getProducers(getConfiguration(), this); producers.add(balancerMetrics); diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index a4bb9f76d7f..0ed4fd73dbc 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -498,7 +498,7 @@ public void run() { } MetricsInfo metricsInfo = getContext().getMetricsInfo(); - metricsInfo.addServiceTags(getApplicationName(), monitorHostAndPort); + metricsInfo.addServiceTags(getApplicationName(), monitorHostAndPort, ""); metricsInfo.init(); try { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 84cc046fbe2..beeee987082 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -126,8 +126,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Sets; -import io.micrometer.core.instrument.Tag; - public class ScanServer extends AbstractServer implements TabletScanClientService.Iface, TabletHostingServer { @@ -404,8 +402,7 @@ public void run() { } MetricsInfo metricsInfo = getContext().getMetricsInfo(); - metricsInfo.addServiceTags(getApplicationName(), clientAddress); - metricsInfo.addCommonTags(List.of(Tag.of("resource.group", groupName))); + metricsInfo.addServiceTags(getApplicationName(), clientAddress, groupName); scanMetrics = new TabletServerScanMetrics(resourceManager::getOpenFiles); sessionManager.setZombieCountConsumer(scanMetrics::setZombieScanThreads); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 24573dfd1f2..bcaa6f21088 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -761,7 +761,7 @@ public void run() { } MetricsInfo metricsInfo = context.getMetricsInfo(); - metricsInfo.addServiceTags(getApplicationName(), clientAddress); + metricsInfo.addServiceTags(getApplicationName(), clientAddress, ""); metrics = new TabletServerMetrics(this); updateMetrics = new TabletServerUpdateMetrics(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index 5650c716707..7e9ba7c9ee0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -137,7 +137,7 @@ public static void main(String[] args) throws Exception { ServiceLock zlock = new ServiceLock(zoo.getZooKeeper(), zLockPath, UUID.randomUUID()); MetricsInfo metricsInfo = context.getMetricsInfo(); - metricsInfo.addServiceTags("zombie.server", serverPort.address); + metricsInfo.addServiceTags("zombie.server", serverPort.address, ""); metricsInfo.init(); LockWatcher lw = new LockWatcher() { From e6d5fea0a186397bd9f39ff924636acc731c570c Mon Sep 17 00:00:00 2001 From: Daniel Roberts ddanielr Date: Thu, 24 Oct 2024 15:01:21 +0000 Subject: [PATCH 2/2] fix typo in javadoc --- .../main/java/org/apache/accumulo/core/metrics/MetricsInfo.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java index f0962a5c1f8..45e4971c955 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java @@ -88,7 +88,7 @@ static List addressTags(final HostAndPort hostAndPort) { * * @param applicationName the application (process) name. * @param hostAndPort the host:port pair - * @oaram resourceGroup the resource group name + * @param resourceGroup the resource group name */ void addServiceTags(final String applicationName, final HostAndPort hostAndPort, final String resourceGroup);