From 75244803e17d49a40a84276e19201e3a76a784af Mon Sep 17 00:00:00 2001 From: Mathis Date: Fri, 11 Mar 2022 14:17:05 +0100 Subject: [PATCH] Make StatsD client queue size and telemetry configurable --- .../java/org/datadog/jmxfetch/AppConfig.java | 43 ++++++++++++++++--- .../jmxfetch/converter/ReporterConverter.java | 12 ------ .../jmxfetch/reporter/ReporterFactory.java | 23 +++++++--- .../jmxfetch/reporter/StatsdReporter.java | 20 ++++++++- .../jmxfetch/TestParsingJCommander.java | 27 +++++++++++- 5 files changed, 97 insertions(+), 28 deletions(-) delete mode 100644 src/main/java/org/datadog/jmxfetch/converter/ReporterConverter.java diff --git a/src/main/java/org/datadog/jmxfetch/AppConfig.java b/src/main/java/org/datadog/jmxfetch/AppConfig.java index f2d599b86..3d6e1acc3 100644 --- a/src/main/java/org/datadog/jmxfetch/AppConfig.java +++ b/src/main/java/org/datadog/jmxfetch/AppConfig.java @@ -2,14 +2,13 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; - import lombok.Builder; import lombok.ToString; import org.datadog.jmxfetch.converter.ExitWatcherConverter; -import org.datadog.jmxfetch.converter.ReporterConverter; import org.datadog.jmxfetch.reporter.ConsoleReporter; import org.datadog.jmxfetch.reporter.JsonReporter; import org.datadog.jmxfetch.reporter.Reporter; +import org.datadog.jmxfetch.reporter.ReporterFactory; import org.datadog.jmxfetch.service.ServiceNameProvider; import org.datadog.jmxfetch.validator.LogLevelValidator; import org.datadog.jmxfetch.validator.PositiveIntegerValidator; @@ -57,6 +56,9 @@ public class AppConfig { private static final int DEFAULT_THREAD_POOL_SIZE = 3; private static final int DEFAULT_COLLECTION_TO_S = 60; private static final int DEFAULT_RECONNECTION_TO_S = 60; + private static final int DEFAULT_STATSD_QUEUE_SIZE = 4096; + + private Reporter reporter; @Parameter( names = {"--help", "-h"}, @@ -111,9 +113,22 @@ public class AppConfig { + "\"statsd:unix://[STATSD_UNIX_SOCKET_PATH]\", " + "\"console\" or \"json\"", validateWith = ReporterValidator.class, - converter = ReporterConverter.class, + required = true) + private String reporterString; + + @Parameter( + names = {"--statsd_telemetry", "-st"}, + description = "Enable StatsD client telemetry reporting", required = false) - private Reporter reporter; + private boolean statsdTelemetry; + + @Parameter( + names = {"--statsd_queue_size", "-sq"}, + description = "Maximum number of unprocessed messages in the StatsD client queue.", + validateWith = PositiveIntegerValidator.class, + required = false) + @Builder.Default + private int statsdQueueSize = DEFAULT_STATSD_QUEUE_SIZE; @Parameter( names = {"--check", "-c"}, @@ -290,11 +305,11 @@ public String getAction() { } public boolean isConsoleReporter() { - return reporter != null && (reporter instanceof ConsoleReporter); + return getReporter() != null && (getReporter() instanceof ConsoleReporter); } public boolean isJsonReporter() { - return reporter != null && (reporter instanceof JsonReporter); + return getReporter() != null && (getReporter() instanceof JsonReporter); } public boolean isHelp() { @@ -345,7 +360,11 @@ public boolean getAutoDiscoveryPipeEnabled() { return adEnabled; } + /** Returns the Reporter for this app config. */ public Reporter getReporter() { + if (reporter == null && reporterString != null) { + reporter = ReporterFactory.getReporter(this); + } return reporter; } @@ -361,6 +380,18 @@ public String getTmpDirectory() { return tmpDirectory; } + public String getReporterString() { + return reporterString; + } + + public boolean getStatsdTelemetry() { + return statsdTelemetry; + } + + public int getStatsdQueueSize() { + return statsdQueueSize; + } + public String getLogLevel() { return logLevel; } diff --git a/src/main/java/org/datadog/jmxfetch/converter/ReporterConverter.java b/src/main/java/org/datadog/jmxfetch/converter/ReporterConverter.java deleted file mode 100644 index 3d81a7ae7..000000000 --- a/src/main/java/org/datadog/jmxfetch/converter/ReporterConverter.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.datadog.jmxfetch.converter; - -import com.beust.jcommander.IStringConverter; -import org.datadog.jmxfetch.reporter.Reporter; -import org.datadog.jmxfetch.reporter.ReporterFactory; - -public class ReporterConverter implements IStringConverter { - - public Reporter convert(String value) { - return ReporterFactory.getReporter(value); - } -} diff --git a/src/main/java/org/datadog/jmxfetch/reporter/ReporterFactory.java b/src/main/java/org/datadog/jmxfetch/reporter/ReporterFactory.java index 5865b23db..7aa36a938 100644 --- a/src/main/java/org/datadog/jmxfetch/reporter/ReporterFactory.java +++ b/src/main/java/org/datadog/jmxfetch/reporter/ReporterFactory.java @@ -1,15 +1,15 @@ package org.datadog.jmxfetch.reporter; -import org.datadog.jmxfetch.util.StringUtils; +import org.datadog.jmxfetch.AppConfig; -import java.util.Arrays; import java.util.regex.Matcher; import java.util.regex.Pattern; public class ReporterFactory { - /** Gets the reporter for the corresponding type string (console, statsd). */ - public static Reporter getReporter(String type) { + /** Gets the reporter for the corresponding app config. */ + public static Reporter getReporter(AppConfig appConfig) { + String type = appConfig.getReporterString(); if (type == null || type.length() <= 0) { throw new IllegalArgumentException("Null or empty reporter type"); } @@ -23,17 +23,26 @@ public static Reporter getReporter(String type) { if (matcher.find() && matcher.groupCount() == 2) { String host = matcher.group(1); Integer port = Integer.valueOf(matcher.group(2)); - return new StatsdReporter(host, port); + return new StatsdReporter( + host, + port, + appConfig.getStatsdTelemetry(), + appConfig.getStatsdQueueSize()); } matcher = Pattern.compile("^statsd:unix://(.*)$").matcher(type); if (matcher.find() && matcher.groupCount() == 1) { String socketPath = matcher.group(1); - return new StatsdReporter(socketPath, 0); + return new StatsdReporter( + socketPath, + 0, + appConfig.getStatsdTelemetry(), + appConfig.getStatsdQueueSize()); } } throw new IllegalArgumentException("Invalid reporter type: " + type); } - private ReporterFactory() {} + private ReporterFactory() { + } } diff --git a/src/main/java/org/datadog/jmxfetch/reporter/StatsdReporter.java b/src/main/java/org/datadog/jmxfetch/reporter/StatsdReporter.java index 6cdfed65a..0bbdd1569 100644 --- a/src/main/java/org/datadog/jmxfetch/reporter/StatsdReporter.java +++ b/src/main/java/org/datadog/jmxfetch/reporter/StatsdReporter.java @@ -15,12 +15,16 @@ public class StatsdReporter extends Reporter { private StatsDClient statsDClient; private String statsdHost; private int statsdPort; + private Boolean telemetry; + private int queueSize; private long initializationTime; /** Constructor, instantiates statsd reported to provided host and port. */ - public StatsdReporter(String statsdHost, int statsdPort) { + public StatsdReporter(String statsdHost, int statsdPort, boolean telemetry, int queueSize) { this.statsdHost = statsdHost; this.statsdPort = statsdPort; + this.telemetry = telemetry; + this.queueSize = queueSize; this.init(); } @@ -34,10 +38,14 @@ private void init() { /* Create the StatsDClient with "entity-id" set to "none" to avoid having dogstatsd server adding origin tags, when the connection is done with UDS. */ + log.info("Initializing Statsd reporter with parameters host={} port={} telemetry={} " + + "queueSize={} entityId={}", + this.statsdHost, this.statsdPort, this.telemetry, this.queueSize, entityId); NonBlockingStatsDClientBuilder builder = new NonBlockingStatsDClientBuilder() - .enableTelemetry(false) .hostname(this.statsdHost) .port(this.statsdPort) + .enableTelemetry(this.telemetry) + .queueSize(this.queueSize) .errorHandler(handler) .entityID(entityId); @@ -105,4 +113,12 @@ public String getStatsdHost() { public int getStatsdPort() { return statsdPort; } + + public boolean getTelemetry() { + return telemetry; + } + + public int getQueueSize() { + return queueSize; + } } diff --git a/src/test/java/org/datadog/jmxfetch/TestParsingJCommander.java b/src/test/java/org/datadog/jmxfetch/TestParsingJCommander.java index 42bb133ef..1a7faa499 100644 --- a/src/test/java/org/datadog/jmxfetch/TestParsingJCommander.java +++ b/src/test/java/org/datadog/jmxfetch/TestParsingJCommander.java @@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import com.beust.jcommander.JCommander; import com.beust.jcommander.ParameterException; @@ -23,7 +24,7 @@ public class TestParsingJCommander { private static final String SINGLE_CHECK = "jmx.yaml"; private static final List MULTI_CHECK = Arrays.asList("jmx.yaml", "jmx-2.yaml"); private static final String STATUS_LOCATION = "/status/status_location"; - private static final String EXIT_FILE_LOCATION = "/status/exit_locationt"; + private static final String EXIT_FILE_LOCATION = "/status/exit_location"; private static final String IPC_HOSTNAME = "localhost"; private static final String IPC_PORT = "5001"; @@ -161,6 +162,8 @@ public void testParsingReporter() { assertTrue(appConfig.getReporter() instanceof StatsdReporter); assertEquals("localhost", ((StatsdReporter) appConfig.getReporter()).getStatsdHost()); assertEquals(10, ((StatsdReporter) appConfig.getReporter()).getStatsdPort()); + assertFalse(((StatsdReporter) appConfig.getReporter()).getTelemetry()); + assertEquals(4096, ((StatsdReporter) appConfig.getReporter()).getQueueSize()); // statsd reporter with custom ipv4 host params = @@ -232,6 +235,28 @@ public void testParsingReporter() { assertTrue(appConfig.getReporter() instanceof StatsdReporter); assertEquals("/path/to/dsd.socket", ((StatsdReporter) appConfig.getReporter()).getStatsdHost()); assertEquals(0, ((StatsdReporter) appConfig.getReporter()).getStatsdPort()); + + // Telemetry and queue size + params = + new String[] { + "-r", + "statsd:unix:///path/to/dsd.socket", + "-st", + "-sq", + "8192", + "--check", + SINGLE_CHECK, + "--conf_directory", + CONF_DIR, + AppConfig.ACTION_COLLECT + }; + appConfig = testCommand(params); + assertNotNull(appConfig.getReporter()); + assertTrue(appConfig.getReporter() instanceof StatsdReporter); + assertEquals("/path/to/dsd.socket", ((StatsdReporter) appConfig.getReporter()).getStatsdHost()); + assertEquals(0, ((StatsdReporter) appConfig.getReporter()).getStatsdPort()); + assertTrue(((StatsdReporter) appConfig.getReporter()).getTelemetry()); + assertEquals(8192, ((StatsdReporter) appConfig.getReporter()).getQueueSize()); } @Test