Skip to content

Commit

Permalink
[auto_discovery] pull configurations from API, and apply configs.
Browse files Browse the repository at this point in the history
[autodiscovery] check for json configs every iteration - improvments in pipeline.

[httpclient] url encoded content type with GETs

[httpclient] wrap responses.

[api] if response is 204 no updates + method refactor
  • Loading branch information
truthbk committed Sep 25, 2017
1 parent d65e514 commit 5e6c9d7
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 158 deletions.
160 changes: 119 additions & 41 deletions src/main/java/org/datadog/jmxfetch/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.commons.lang3.CharEncoding;
import org.apache.commons.io.IOUtils;
import org.datadog.jmxfetch.reporter.Reporter;
import org.datadog.jmxfetch.util.CustomLogger;
import org.datadog.jmxfetch.util.FileHelper;
Expand All @@ -38,6 +39,8 @@
import com.beust.jcommander.ParameterException;
import com.google.common.primitives.Bytes;

import com.fasterxml.jackson.core.JsonProcessingException;


@SuppressWarnings("unchecked")
public class App {
Expand All @@ -52,16 +55,24 @@ public class App {
private static final int AD_MAX_MAG_INSTANCES = 4; // 1000 instances ought to be enough for anyone

private static int loopCounter;
private AtomicBoolean reinit = new AtomicBoolean(false);
private int lastJSONConfigTS;
private HashMap<String, Object> adJSONConfigs;
private ConcurrentHashMap<String, YamlParser> configs;
private ConcurrentHashMap<String, YamlParser> adConfigs = new ConcurrentHashMap<String, YamlParser>();
private ConcurrentHashMap<String, YamlParser> adPipeConfigs = new ConcurrentHashMap<String, YamlParser>();
private ArrayList<Instance> instances = new ArrayList<Instance>();
private LinkedList<Instance> brokenInstances = new LinkedList<Instance>();
private AtomicBoolean reinit = new AtomicBoolean(false);

private AppConfig appConfig;
private HttpClient client;


public App(AppConfig appConfig) {
this.appConfig = appConfig;
// setup client
if (appConfig.remoteEnabled()) {
client = new HttpClient(appConfig.getIPCHost(), appConfig.getIPCPort(), false);
}
this.configs = getConfigs(appConfig);
}

Expand Down Expand Up @@ -120,6 +131,9 @@ public static void main(String[] args) {

LOGGER.info("JMX Fetch has started");

//set up the config status
config.updateStatus();

App app = new App(config);

// Initiate JMX Connections, get attributes that match the yaml configuration
Expand Down Expand Up @@ -243,7 +257,7 @@ void start() {
long delta_s = 0;
FileInputStream adPipe = null;

if(appConfig.getAutoDiscoveryEnabled()) {
if(appConfig.getAutoDiscoveryPipeEnabled()) {
LOGGER.info("Auto Discovery enabled");
adPipe = newAutoDiscoveryPipe();
try {
Expand All @@ -261,11 +275,11 @@ void start() {
System.exit(0);
}

// any SD configs waiting in pipe?
if(adPipe == null && appConfig.getAutoDiscoveryEnabled()) {
if(adPipe == null && appConfig.getAutoDiscoveryPipeEnabled()) {
// If SD is enabled and the pipe is not open, retry opening pipe
adPipe = newAutoDiscoveryPipe();
}
// any AutoDiscovery configs waiting?
try {
if(adPipe != null && adPipe.available() > 0) {
byte[] buffer = new byte[0];
Expand All @@ -291,10 +305,14 @@ void start() {
}
setReinit(processAutoDiscovery(buffer));
}

if(appConfig.remoteEnabled()) {
setReinit(getJSONConfigs());
}
} catch(IOException e) {
LOGGER.warn("Unable to read from pipe - Service Discovery configuration may have been skipped.");
} catch(Exception e) {
LOGGER.warn("Unknown problem parsing auto-discovery configuration: " + e);
LOGGER.warn("Problem parsing auto-discovery configuration: " + e);
}

long start = System.currentTimeMillis();
Expand All @@ -306,7 +324,7 @@ void start() {
doIteration();
} else {
LOGGER.warn("No instance could be initiated. Retrying initialization.");
appConfig.getStatus().flush(appConfig.getIPCPort());
appConfig.getStatus().flush();
configs = getConfigs(appConfig);
init(true);
}
Expand Down Expand Up @@ -425,7 +443,7 @@ public void doIteration() {
}

try {
appConfig.getStatus().flush(appConfig.getIPCPort());
appConfig.getStatus().flush();
} catch (Exception e) {
LOGGER.error("Unable to flush stats.", e);
}
Expand Down Expand Up @@ -460,7 +478,7 @@ public boolean addConfig(String name, YamlParser config) {
return false;
}

this.adConfigs.put(name, config);
this.adPipeConfigs.put(name, config);
this.setReinit(true);

return true;
Expand Down Expand Up @@ -508,6 +526,47 @@ private ConcurrentHashMap<String, YamlParser> getConfigs(AppConfig config) {
return configs;
}

private boolean getJSONConfigs() {
HttpClient.HttpResponse response;
boolean update = false;

if (this.client == null) {
return update;
}

try {
String uripath = "agent/jmxconfigs?timestamp="+lastJSONConfigTS;
response = client.request("GET", "", uripath);
if (!response.isResponse2xx()) {
LOGGER.warn("Failed collecting JSON configs: [" +
response.getResponseCode() +"] " +
response.getResponseBody());
return update;
} else if (response.getResponseCode() == 204) {
LOGGER.debug("No configuration changes...");
return update;
}

LOGGER.debug("Received the following JSON configs: " + response.getResponseBody());

InputStream jsonInputStream = IOUtils.toInputStream(response.getResponseBody(), "UTF-8");
JsonParser parser = new JsonParser(jsonInputStream);
int timestamp = ((Integer) parser.getJsonTimestamp()).intValue();
if (timestamp > lastJSONConfigTS) {
adJSONConfigs = (HashMap<String, Object>)parser.getJsonConfigs();
lastJSONConfigTS = timestamp;
update = true;
LOGGER.debug("update is in order - updating timestamp: " + lastJSONConfigTS);
}
} catch (JsonProcessingException e) {
LOGGER.error("error processing JSON response: " + e);
} catch (IOException e) {
LOGGER.error("unable to collect remote JMX configs: " + e);
}

return update;
}

private void reportStatus(AppConfig appConfig, Reporter reporter, Instance instance,
int metricCount, String message, String status) {
String checkName = instance.getCheckName();
Expand All @@ -525,17 +584,50 @@ private void sendServiceCheck(Reporter reporter, Instance instance, String messa
reporter.resetServiceCheckCount(checkName);
}

private void instantiate(LinkedHashMap<String, Object> instanceMap, LinkedHashMap<String, Object> initConfig,
String checkName, AppConfig appConfig, boolean forceNewConnection) {

Instance instance;
Reporter reporter = appConfig.getReporter();

try {
instance = new Instance(instanceMap, initConfig, checkName, appConfig);
} catch (Exception e) {
String warning = "Unable to create instance. Please check your yaml file";
appConfig.getStatus().addInitFailedCheck(checkName, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
return;
}

try {
// initiate the JMX Connection
instance.init(forceNewConnection);
instances.add(instance);
} catch (IOException e) {
instance.cleanUp();
brokenInstances.add(instance);
String warning = CANNOT_CONNECT_TO_INSTANCE + instance + ". " + e.getMessage();
this.reportStatus(appConfig, reporter, instance, 0, warning, Status.STATUS_ERROR);
this.sendServiceCheck(reporter, instance, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
} catch (Exception e) {
instance.cleanUp();
brokenInstances.add(instance);
String warning = "Unexpected exception while initiating instance " + instance + " : " + e.getMessage();
this.reportStatus(appConfig, reporter, instance, 0, warning, Status.STATUS_ERROR);
this.sendServiceCheck(reporter, instance, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
}
}

public void init(boolean forceNewConnection) {
clearInstances(instances);
clearInstances(brokenInstances);


Reporter reporter = appConfig.getReporter();


Iterator<Entry<String, YamlParser>> it = configs.entrySet().iterator();
// SD config cache doesn't remove configs - it just overwrites.
Iterator<Entry<String, YamlParser>> itSD = adConfigs.entrySet().iterator();
Iterator<Entry<String, YamlParser>> itSD = adPipeConfigs.entrySet().iterator();
while (it.hasNext() || itSD.hasNext()) {
Map.Entry<String, YamlParser> entry;
boolean sdIterator = false;
Expand All @@ -548,6 +640,7 @@ public void init(boolean forceNewConnection) {

String name = entry.getKey();
YamlParser yamlConfig = entry.getValue();
// AD config cache doesn't remove configs - it just overwrites.
if(!sdIterator) {
it.remove();
}
Expand All @@ -561,35 +654,20 @@ public void init(boolean forceNewConnection) {
}

for (LinkedHashMap<String, Object> configInstance : configInstances) {
Instance instance;
//Create a new Instance object
try {
instance = new Instance(configInstance, (LinkedHashMap<String, Object>) yamlConfig.getInitConfig(),
name, appConfig);
} catch (Exception e) {
String warning = "Unable to create instance. Please check your yaml file";
appConfig.getStatus().addInitFailedCheck(name, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
continue;
}
try {
// initiate the JMX Connection
instance.init(forceNewConnection);
instances.add(instance);
} catch (IOException e) {
instance.cleanUp();
brokenInstances.add(instance);
String warning = CANNOT_CONNECT_TO_INSTANCE + instance + ". " + e.getMessage();
this.reportStatus(appConfig, reporter, instance, 0, warning, Status.STATUS_ERROR);
this.sendServiceCheck(reporter, instance, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
} catch (Exception e) {
instance.cleanUp();
brokenInstances.add(instance);
String warning = "Unexpected exception while initiating instance " + instance + " : " + e.getMessage();
this.reportStatus(appConfig, reporter, instance, 0, warning, Status.STATUS_ERROR);
this.sendServiceCheck(reporter, instance, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
instantiate(configInstance, (LinkedHashMap<String, Object>) yamlConfig.getInitConfig(),
name, appConfig, forceNewConnection);
}
}

//Process JSON configurations
if (adJSONConfigs != null) {
for (String check : adJSONConfigs.keySet()) {
HashMap<String, Object> checkConfig = (HashMap<String, Object>) adJSONConfigs.get(check);
LinkedHashMap<String, Object> initConfig = (LinkedHashMap<String, Object>) checkConfig.get("init_config");
ArrayList<LinkedHashMap<String, Object>> configInstances = (ArrayList<LinkedHashMap<String, Object>>) checkConfig.get("instances");
for (LinkedHashMap<String, Object> configInstance : configInstances) {
instantiate(configInstance, initConfig, check, appConfig, forceNewConnection);
}
}
}
Expand Down
37 changes: 33 additions & 4 deletions src/main/java/org/datadog/jmxfetch/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import org.datadog.jmxfetch.converter.ExitWatcherConverter;
import org.datadog.jmxfetch.converter.ReporterConverter;
import org.datadog.jmxfetch.converter.StatusConverter;
import org.datadog.jmxfetch.reporter.ConsoleReporter;
import org.datadog.jmxfetch.reporter.Reporter;
import org.datadog.jmxfetch.validator.Log4JLevelValidator;
Expand Down Expand Up @@ -90,9 +89,8 @@ class AppConfig {

@Parameter(names = {"--status_location", "-s"},
description = "Absolute path of the status file. (default to null = no status file written)",
converter = StatusConverter.class,
required = false)
private Status status = new Status();
private String statusLocation;

@Parameter(names = {"--exit_file_location", "-e"},
description = "Absolute path of the trigger file to watch to exit. (default to null = no exit on file)",
Expand All @@ -106,12 +104,39 @@ class AppConfig {
required = true)
private List<String> action = null;

@Parameter(names = {"--ipc_host", "-H"},
description = "IPC host",
required = false)
private String ipcHost;

@Parameter(names = {"--ipc_port", "-I"},
description = "IPC port",
validateWith = PositiveIntegerValidator.class,
required = false)
private int ipcPort = 0;

private Status status = new Status();

public boolean updateStatus() {
if (statusLocation != null) {
status = new Status(statusLocation);
return true;
} else if (ipcHost != null && ipcPort > 0) {
status = new Status(ipcHost, ipcPort);
return true;
}

return false;
}

public boolean remoteEnabled() {
return (ipcHost != null && ipcPort > 0);
}

public String getStatusLocation() {
return this.statusLocation;
}

public String getAction() {
return this.action.get(0);
}
Expand All @@ -136,11 +161,15 @@ public int getCheckPeriod() {
return checkPeriod;
}

public String getIPCHost() {
return ipcHost;
}

public int getIPCPort() {
return ipcPort;
}

public boolean getAutoDiscoveryEnabled() {
public boolean getAutoDiscoveryPipeEnabled() {
return adEnabled;
}

Expand Down
Loading

0 comments on commit 5e6c9d7

Please sign in to comment.