Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[api] collect instance configurations via API #156

Merged
merged 8 commits into from
Oct 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ jdk:
- oraclejdk8
- oraclejdk7
- openjdk7
- openjdk6

addons:
hostname: dd-jmxfetch-testhost
25 changes: 19 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<url>http://maven.apache.org</url>

<properties>
<maven.compiler.target>1.6</maven.compiler.target>
<commons-io.version>2.4</commons-io.version>
<commons-lang.version>2.6</commons-lang.version>
<apache-commons-lang3.version>3.5</apache-commons-lang3.version>
Expand All @@ -20,8 +21,8 @@
<jcommander.version>1.35</jcommander.version>
<junit.version>4.11</junit.version>
<log4j.version>1.2.17</log4j.version>
<gson.version>1.4</gson.version>
<mockito.version>2.2.27</mockito.version>
<jackson.version>2.9.0</jackson.version>
<mockito.version>2.2.27</mockito.version>
<maven-surefire-plugin.version>2.9</maven-surefire-plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<snakeyaml.version>1.13</snakeyaml.version>
Expand Down Expand Up @@ -65,11 +66,23 @@
<groupId>com.datadoghq</groupId>
<artifactId>java-dogstatsd-client</artifactId>
<version>${java-dogstatsd-client.version}</version>
</dependency>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<!-- Note: core-annotations version x.y.0 is generally compatible with
(identical to) version x.y.1, x.y.2, etc. -->
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
Expand Down
164 changes: 123 additions & 41 deletions src/main/java/org/datadog/jmxfetch/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.commons.lang3.CharEncoding;
import org.apache.commons.io.IOUtils;
import org.datadog.jmxfetch.reporter.Reporter;
import org.datadog.jmxfetch.util.CustomLogger;
import org.datadog.jmxfetch.util.FileHelper;
Expand All @@ -38,6 +39,8 @@
import com.beust.jcommander.ParameterException;
import com.google.common.primitives.Bytes;

import com.fasterxml.jackson.core.JsonProcessingException;


@SuppressWarnings("unchecked")
public class App {
Expand All @@ -52,16 +55,24 @@ public class App {
private static final int AD_MAX_MAG_INSTANCES = 4; // 1000 instances ought to be enough for anyone

private static int loopCounter;
private AtomicBoolean reinit = new AtomicBoolean(false);
private int lastJSONConfigTS;
private HashMap<String, Object> adJSONConfigs;
private ConcurrentHashMap<String, YamlParser> configs;
private ConcurrentHashMap<String, YamlParser> adConfigs = new ConcurrentHashMap<String, YamlParser>();
private ConcurrentHashMap<String, YamlParser> adPipeConfigs = new ConcurrentHashMap<String, YamlParser>();
private ArrayList<Instance> instances = new ArrayList<Instance>();
private LinkedList<Instance> brokenInstances = new LinkedList<Instance>();
private AtomicBoolean reinit = new AtomicBoolean(false);

private AppConfig appConfig;
private HttpClient client;


public App(AppConfig appConfig) {
this.appConfig = appConfig;
// setup client
if (appConfig.remoteEnabled()) {
client = new HttpClient(appConfig.getIPCHost(), appConfig.getIPCPort(), false);
}
this.configs = getConfigs(appConfig);
}

Expand Down Expand Up @@ -120,6 +131,9 @@ public static void main(String[] args) {

LOGGER.info("JMX Fetch has started");

//set up the config status
config.updateStatus();

App app = new App(config);

// Initiate JMX Connections, get attributes that match the yaml configuration
Expand Down Expand Up @@ -243,7 +257,7 @@ void start() {
long delta_s = 0;
FileInputStream adPipe = null;

if(appConfig.getAutoDiscoveryEnabled()) {
if(appConfig.getAutoDiscoveryPipeEnabled()) {
LOGGER.info("Auto Discovery enabled");
adPipe = newAutoDiscoveryPipe();
try {
Expand All @@ -261,11 +275,11 @@ void start() {
System.exit(0);
}

// any SD configs waiting in pipe?
if(adPipe == null && appConfig.getAutoDiscoveryEnabled()) {
if(adPipe == null && appConfig.getAutoDiscoveryPipeEnabled()) {
// If SD is enabled and the pipe is not open, retry opening pipe
adPipe = newAutoDiscoveryPipe();
}
// any AutoDiscovery configs waiting?
try {
if(adPipe != null && adPipe.available() > 0) {
byte[] buffer = new byte[0];
Expand All @@ -291,10 +305,14 @@ void start() {
}
setReinit(processAutoDiscovery(buffer));
}

if(appConfig.remoteEnabled()) {
setReinit(getJSONConfigs());
}
} catch(IOException e) {
LOGGER.warn("Unable to read from pipe - Service Discovery configuration may have been skipped.");
} catch(Exception e) {
LOGGER.warn("Unknown problem parsing auto-discovery configuration: " + e);
LOGGER.warn("Problem parsing auto-discovery configuration: " + e);
}

long start = System.currentTimeMillis();
Expand All @@ -306,7 +324,7 @@ void start() {
doIteration();
} else {
LOGGER.warn("No instance could be initiated. Retrying initialization.");
appConfig.getStatus().flush(appConfig.getIPCPort());
appConfig.getStatus().flush();
configs = getConfigs(appConfig);
init(true);
}
Expand Down Expand Up @@ -425,7 +443,7 @@ public void doIteration() {
}

try {
appConfig.getStatus().flush(appConfig.getIPCPort());
appConfig.getStatus().flush();
} catch (Exception e) {
LOGGER.error("Unable to flush stats.", e);
}
Expand Down Expand Up @@ -460,12 +478,16 @@ public boolean addConfig(String name, YamlParser config) {
return false;
}

this.adConfigs.put(name, config);
this.adPipeConfigs.put(name, config);
this.setReinit(true);

return true;
}

public boolean addJsonConfig(String name, String json) {
return false;
}

private ConcurrentHashMap<String, YamlParser> getConfigs(AppConfig config) {
ConcurrentHashMap<String, YamlParser> configs = new ConcurrentHashMap<String, YamlParser>();
YamlParser fileConfig;
Expand Down Expand Up @@ -504,6 +526,47 @@ private ConcurrentHashMap<String, YamlParser> getConfigs(AppConfig config) {
return configs;
}

private boolean getJSONConfigs() {
HttpClient.HttpResponse response;
boolean update = false;

if (this.client == null) {
return update;
}

try {
String uripath = "agent/jmx/configs?timestamp="+lastJSONConfigTS;
response = client.request("GET", "", uripath);
if (!response.isResponse2xx()) {
LOGGER.warn("Failed collecting JSON configs: [" +
response.getResponseCode() +"] " +
response.getResponseBody());
return update;
} else if (response.getResponseCode() == 204) {
LOGGER.debug("No configuration changes...");
return update;
}

LOGGER.debug("Received the following JSON configs: " + response.getResponseBody());

InputStream jsonInputStream = IOUtils.toInputStream(response.getResponseBody(), "UTF-8");
JsonParser parser = new JsonParser(jsonInputStream);
int timestamp = ((Integer) parser.getJsonTimestamp()).intValue();
if (timestamp > lastJSONConfigTS) {
adJSONConfigs = (HashMap<String, Object>)parser.getJsonConfigs();
lastJSONConfigTS = timestamp;
update = true;
LOGGER.debug("update is in order - updating timestamp: " + lastJSONConfigTS);
}
} catch (JsonProcessingException e) {
LOGGER.error("error processing JSON response: " + e);
} catch (IOException e) {
LOGGER.error("unable to collect remote JMX configs: " + e);
}

return update;
}

private void reportStatus(AppConfig appConfig, Reporter reporter, Instance instance,
int metricCount, String message, String status) {
String checkName = instance.getCheckName();
Expand All @@ -521,17 +584,50 @@ private void sendServiceCheck(Reporter reporter, Instance instance, String messa
reporter.resetServiceCheckCount(checkName);
}

private void instantiate(LinkedHashMap<String, Object> instanceMap, LinkedHashMap<String, Object> initConfig,
String checkName, AppConfig appConfig, boolean forceNewConnection) {

Instance instance;
Reporter reporter = appConfig.getReporter();

try {
instance = new Instance(instanceMap, initConfig, checkName, appConfig);
} catch (Exception e) {
String warning = "Unable to create instance. Please check your yaml file";
appConfig.getStatus().addInitFailedCheck(checkName, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
return;
}

try {
// initiate the JMX Connection
instance.init(forceNewConnection);
instances.add(instance);
} catch (IOException e) {
instance.cleanUp();
brokenInstances.add(instance);
String warning = CANNOT_CONNECT_TO_INSTANCE + instance + ". " + e.getMessage();
this.reportStatus(appConfig, reporter, instance, 0, warning, Status.STATUS_ERROR);
this.sendServiceCheck(reporter, instance, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
} catch (Exception e) {
instance.cleanUp();
brokenInstances.add(instance);
String warning = "Unexpected exception while initiating instance " + instance + " : " + e.getMessage();
this.reportStatus(appConfig, reporter, instance, 0, warning, Status.STATUS_ERROR);
this.sendServiceCheck(reporter, instance, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
}
}

public void init(boolean forceNewConnection) {
clearInstances(instances);
clearInstances(brokenInstances);


Reporter reporter = appConfig.getReporter();


Iterator<Entry<String, YamlParser>> it = configs.entrySet().iterator();
// SD config cache doesn't remove configs - it just overwrites.
Iterator<Entry<String, YamlParser>> itSD = adConfigs.entrySet().iterator();
Iterator<Entry<String, YamlParser>> itSD = adPipeConfigs.entrySet().iterator();
while (it.hasNext() || itSD.hasNext()) {
Map.Entry<String, YamlParser> entry;
boolean sdIterator = false;
Expand All @@ -544,6 +640,7 @@ public void init(boolean forceNewConnection) {

String name = entry.getKey();
YamlParser yamlConfig = entry.getValue();
// AD config cache doesn't remove configs - it just overwrites.
if(!sdIterator) {
it.remove();
}
Expand All @@ -557,35 +654,20 @@ public void init(boolean forceNewConnection) {
}

for (LinkedHashMap<String, Object> configInstance : configInstances) {
Instance instance;
//Create a new Instance object
try {
instance = new Instance(configInstance, (LinkedHashMap<String, Object>) yamlConfig.getInitConfig(),
name, appConfig);
} catch (Exception e) {
String warning = "Unable to create instance. Please check your yaml file";
appConfig.getStatus().addInitFailedCheck(name, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
continue;
}
try {
// initiate the JMX Connection
instance.init(forceNewConnection);
instances.add(instance);
} catch (IOException e) {
instance.cleanUp();
brokenInstances.add(instance);
String warning = CANNOT_CONNECT_TO_INSTANCE + instance + ". " + e.getMessage();
this.reportStatus(appConfig, reporter, instance, 0, warning, Status.STATUS_ERROR);
this.sendServiceCheck(reporter, instance, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
} catch (Exception e) {
instance.cleanUp();
brokenInstances.add(instance);
String warning = "Unexpected exception while initiating instance " + instance + " : " + e.getMessage();
this.reportStatus(appConfig, reporter, instance, 0, warning, Status.STATUS_ERROR);
this.sendServiceCheck(reporter, instance, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
instantiate(configInstance, (LinkedHashMap<String, Object>) yamlConfig.getInitConfig(),
name, appConfig, forceNewConnection);
}
}

//Process JSON configurations
if (adJSONConfigs != null) {
for (String check : adJSONConfigs.keySet()) {
HashMap<String, Object> checkConfig = (HashMap<String, Object>) adJSONConfigs.get(check);
LinkedHashMap<String, Object> initConfig = (LinkedHashMap<String, Object>) checkConfig.get("init_config");
ArrayList<LinkedHashMap<String, Object>> configInstances = (ArrayList<LinkedHashMap<String, Object>>) checkConfig.get("instances");
for (LinkedHashMap<String, Object> configInstance : configInstances) {
instantiate(configInstance, initConfig, check, appConfig, forceNewConnection);
}
}
}
Expand Down
Loading