Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make StatsD client queue size and telemetry configurable #390

Merged
merged 1 commit into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions src/main/java/org/datadog/jmxfetch/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;

import lombok.Builder;
import lombok.ToString;
import org.datadog.jmxfetch.converter.ExitWatcherConverter;
import org.datadog.jmxfetch.converter.ReporterConverter;
import org.datadog.jmxfetch.reporter.ConsoleReporter;
import org.datadog.jmxfetch.reporter.JsonReporter;
import org.datadog.jmxfetch.reporter.Reporter;
import org.datadog.jmxfetch.reporter.ReporterFactory;
import org.datadog.jmxfetch.service.ServiceNameProvider;
import org.datadog.jmxfetch.validator.LogLevelValidator;
import org.datadog.jmxfetch.validator.PositiveIntegerValidator;
Expand Down Expand Up @@ -57,6 +56,9 @@ public class AppConfig {
private static final int DEFAULT_THREAD_POOL_SIZE = 3;
private static final int DEFAULT_COLLECTION_TO_S = 60;
private static final int DEFAULT_RECONNECTION_TO_S = 60;
private static final int DEFAULT_STATSD_QUEUE_SIZE = 4096;

private Reporter reporter;

@Parameter(
names = {"--help", "-h"},
Expand Down Expand Up @@ -111,9 +113,22 @@ public class AppConfig {
+ "\"statsd:unix://[STATSD_UNIX_SOCKET_PATH]\", "
+ "\"console\" or \"json\"",
validateWith = ReporterValidator.class,
converter = ReporterConverter.class,
required = true)
private String reporterString;

@Parameter(
names = {"--statsd_telemetry", "-st"},
description = "Enable StatsD client telemetry reporting",
required = false)
private Reporter reporter;
private boolean statsdTelemetry;

@Parameter(
names = {"--statsd_queue_size", "-sq"},
description = "Maximum number of unprocessed messages in the StatsD client queue.",
validateWith = PositiveIntegerValidator.class,
required = false)
@Builder.Default
private int statsdQueueSize = DEFAULT_STATSD_QUEUE_SIZE;

@Parameter(
names = {"--check", "-c"},
Expand Down Expand Up @@ -290,11 +305,11 @@ public String getAction() {
}

public boolean isConsoleReporter() {
return reporter != null && (reporter instanceof ConsoleReporter);
return getReporter() != null && (getReporter() instanceof ConsoleReporter);
}

public boolean isJsonReporter() {
return reporter != null && (reporter instanceof JsonReporter);
return getReporter() != null && (getReporter() instanceof JsonReporter);
}

public boolean isHelp() {
Expand Down Expand Up @@ -345,7 +360,11 @@ public boolean getAutoDiscoveryPipeEnabled() {
return adEnabled;
}

/** Returns the Reporter for this app config. */
public Reporter getReporter() {
if (reporter == null && reporterString != null) {
reporter = ReporterFactory.getReporter(this);
}
return reporter;
}

Expand All @@ -361,6 +380,18 @@ public String getTmpDirectory() {
return tmpDirectory;
}

public String getReporterString() {
return reporterString;
}

public boolean getStatsdTelemetry() {
return statsdTelemetry;
}

public int getStatsdQueueSize() {
return statsdQueueSize;
}

public String getLogLevel() {
return logLevel;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package org.datadog.jmxfetch.reporter;

import org.datadog.jmxfetch.util.StringUtils;
import org.datadog.jmxfetch.AppConfig;

import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class ReporterFactory {

/** Gets the reporter for the corresponding type string (console, statsd). */
public static Reporter getReporter(String type) {
/** Gets the reporter for the corresponding app config. */
public static Reporter getReporter(AppConfig appConfig) {
String type = appConfig.getReporterString();
if (type == null || type.length() <= 0) {
throw new IllegalArgumentException("Null or empty reporter type");
}
Expand All @@ -23,17 +23,26 @@ public static Reporter getReporter(String type) {
if (matcher.find() && matcher.groupCount() == 2) {
String host = matcher.group(1);
Integer port = Integer.valueOf(matcher.group(2));
return new StatsdReporter(host, port);
return new StatsdReporter(
host,
port,
appConfig.getStatsdTelemetry(),
appConfig.getStatsdQueueSize());
}

matcher = Pattern.compile("^statsd:unix://(.*)$").matcher(type);
if (matcher.find() && matcher.groupCount() == 1) {
String socketPath = matcher.group(1);
return new StatsdReporter(socketPath, 0);
return new StatsdReporter(
socketPath,
0,
appConfig.getStatsdTelemetry(),
appConfig.getStatsdQueueSize());
}
}
throw new IllegalArgumentException("Invalid reporter type: " + type);
}

private ReporterFactory() {}
private ReporterFactory() {
}
}
20 changes: 18 additions & 2 deletions src/main/java/org/datadog/jmxfetch/reporter/StatsdReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ public class StatsdReporter extends Reporter {
private StatsDClient statsDClient;
private String statsdHost;
private int statsdPort;
private Boolean telemetry;
private int queueSize;
private long initializationTime;

/** Constructor, instantiates statsd reported to provided host and port. */
public StatsdReporter(String statsdHost, int statsdPort) {
public StatsdReporter(String statsdHost, int statsdPort, boolean telemetry, int queueSize) {
this.statsdHost = statsdHost;
this.statsdPort = statsdPort;
this.telemetry = telemetry;
this.queueSize = queueSize;
this.init();
}

Expand All @@ -34,10 +38,14 @@ private void init() {
/* Create the StatsDClient with "entity-id" set to "none" to avoid
having dogstatsd server adding origin tags, when the connection is
done with UDS. */
log.info("Initializing Statsd reporter with parameters host={} port={} telemetry={} "
+ "queueSize={} entityId={}",
this.statsdHost, this.statsdPort, this.telemetry, this.queueSize, entityId);
NonBlockingStatsDClientBuilder builder = new NonBlockingStatsDClientBuilder()
.enableTelemetry(false)
.hostname(this.statsdHost)
.port(this.statsdPort)
.enableTelemetry(this.telemetry)
.queueSize(this.queueSize)
.errorHandler(handler)
.entityID(entityId);

Expand Down Expand Up @@ -105,4 +113,12 @@ public String getStatsdHost() {
public int getStatsdPort() {
return statsdPort;
}

public boolean getTelemetry() {
return telemetry;
}

public int getQueueSize() {
return queueSize;
}
}
27 changes: 26 additions & 1 deletion src/test/java/org/datadog/jmxfetch/TestParsingJCommander.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
Expand All @@ -23,7 +24,7 @@ public class TestParsingJCommander {
private static final String SINGLE_CHECK = "jmx.yaml";
private static final List<String> MULTI_CHECK = Arrays.asList("jmx.yaml", "jmx-2.yaml");
private static final String STATUS_LOCATION = "/status/status_location";
private static final String EXIT_FILE_LOCATION = "/status/exit_locationt";
private static final String EXIT_FILE_LOCATION = "/status/exit_location";
private static final String IPC_HOSTNAME = "localhost";
private static final String IPC_PORT = "5001";

Expand Down Expand Up @@ -161,6 +162,8 @@ public void testParsingReporter() {
assertTrue(appConfig.getReporter() instanceof StatsdReporter);
assertEquals("localhost", ((StatsdReporter) appConfig.getReporter()).getStatsdHost());
assertEquals(10, ((StatsdReporter) appConfig.getReporter()).getStatsdPort());
assertFalse(((StatsdReporter) appConfig.getReporter()).getTelemetry());
assertEquals(4096, ((StatsdReporter) appConfig.getReporter()).getQueueSize());

// statsd reporter with custom ipv4 host
params =
Expand Down Expand Up @@ -232,6 +235,28 @@ public void testParsingReporter() {
assertTrue(appConfig.getReporter() instanceof StatsdReporter);
assertEquals("/path/to/dsd.socket", ((StatsdReporter) appConfig.getReporter()).getStatsdHost());
assertEquals(0, ((StatsdReporter) appConfig.getReporter()).getStatsdPort());

// Telemetry and queue size
params =
new String[] {
"-r",
"statsd:unix:///path/to/dsd.socket",
"-st",
"-sq",
"8192",
"--check",
SINGLE_CHECK,
"--conf_directory",
CONF_DIR,
AppConfig.ACTION_COLLECT
};
appConfig = testCommand(params);
assertNotNull(appConfig.getReporter());
assertTrue(appConfig.getReporter() instanceof StatsdReporter);
assertEquals("/path/to/dsd.socket", ((StatsdReporter) appConfig.getReporter()).getStatsdHost());
assertEquals(0, ((StatsdReporter) appConfig.getReporter()).getStatsdPort());
assertTrue(((StatsdReporter) appConfig.getReporter()).getTelemetry());
assertEquals(8192, ((StatsdReporter) appConfig.getReporter()).getQueueSize());
}

@Test
Expand Down