Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable launch of jmxfetch app's executors as daemon, default to NO-daemon #237

Merged
merged 4 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions src/main/java/org/datadog/jmxfetch/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -89,12 +90,12 @@ public App(AppConfig appConfig) {
this.appConfig = appConfig;

ExecutorService collectionThreadPool =
Executors.newFixedThreadPool(appConfig.getThreadPoolSize());
buildExecutorService(appConfig.getThreadPoolSize());
collectionProcessor =
new TaskProcessor(collectionThreadPool, appConfig.getReporter());

ExecutorService recoveryThreadPool =
Executors.newFixedThreadPool(appConfig.getReconnectionThreadPoolSize());
buildExecutorService(appConfig.getReconnectionThreadPoolSize());
recoveryProcessor = new TaskProcessor(recoveryThreadPool, appConfig.getReporter());

// setup client
Expand Down Expand Up @@ -263,7 +264,7 @@ private void clearInstances(Collection<Instance> instances) {
+ "previous one hogging threads");
recoveryProcessor.stop();
recoveryProcessor.setThreadPoolExecutor(
Executors.newFixedThreadPool(appConfig.getReconnectionThreadPoolSize()));
buildExecutorService(appConfig.getReconnectionThreadPoolSize()));
}

List<TaskStatusHandler> statuses =
Expand All @@ -290,6 +291,24 @@ public TaskStatusHandler invoke(
}
}

/**
* Builds an {@link ExecutorService} of the specified fixed size. Threads will be created
* and executed as daemons if {@link AppConfig#isDaemon()} is true. Defaults to false.
*
* @param size The thread pool size
* @return The create executor
*/
private ExecutorService buildExecutorService(int size) {
return Executors.newFixedThreadPool(size, new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest assigning a proper name to the thread... this would have greatly helped our troubleshooting. Maybe next time someone does some work here?

thread.setDaemon(appConfig.isDaemon());
return thread;
}
});
}

private String getAutoDiscoveryName(String config) {
String[] splitted = config.split(System.getProperty("line.separator"), 2);

Expand Down Expand Up @@ -470,7 +489,7 @@ public void doIteration() {
+ "previous one hogging threads");
collectionProcessor.stop();
collectionProcessor.setThreadPoolExecutor(
Executors.newFixedThreadPool(appConfig.getThreadPoolSize()));
buildExecutorService(appConfig.getThreadPoolSize()));
}

List<TaskStatusHandler> statuses =
Expand Down Expand Up @@ -556,7 +575,7 @@ private void fixBrokenInstances(Reporter reporter) {
+ "previous one hogging threads");
recoveryProcessor.stop();
recoveryProcessor.setThreadPoolExecutor(
Executors.newFixedThreadPool(appConfig.getReconnectionThreadPoolSize()));
buildExecutorService(appConfig.getReconnectionThreadPoolSize()));
}

Collections.shuffle(fixInstanceTasks);
Expand Down Expand Up @@ -885,7 +904,7 @@ public void init(boolean forceNewConnection) {
+ "previous one hogging threads");
recoveryProcessor.stop();
recoveryProcessor.setThreadPoolExecutor(
Executors.newFixedThreadPool(appConfig.getReconnectionThreadPoolSize()));
buildExecutorService(appConfig.getReconnectionThreadPoolSize()));
}

List<TaskStatusHandler> statuses =
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/datadog/jmxfetch/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ public class AppConfig {
@Builder.Default
private boolean targetDirectInstances = false;

/**
* Boolean setting to determine whether internal executors are launched as daemons or not.
* This is useful when JMXFetch is embedded in a client app, e.g. for the java tracer,
* so that the client app's exit doesn't block on the termination of these internal threads.
*/
@Builder.Default
private boolean daemon = false;

// This is used by things like APM agent to provide configuration from resources
private List<String> instanceConfigResources;
// This is used by things like APM agent to provide metric configuration from resources
Expand Down Expand Up @@ -365,4 +373,11 @@ public Integer getRefreshBeansPeriod() {
public Map<String, String> getGlobalTags() {
return globalTags;
}

/**
* @return Whether or not internal threads will be run as daemon.
*/
public boolean isDaemon() {
return daemon;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void setThreadPoolExecutor(ExecutorService executor) {
* */
public boolean ready() {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadPoolExecutor;
return !(tpe.getMaximumPoolSize() == tpe.getActiveCount());
return !tpe.isTerminated() && !(tpe.getMaximumPoolSize() == tpe.getActiveCount());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a question: is this change needed as well for the purposes of the PR, or is it an "unrelated" improvement?

Copy link
Member Author

@labbati labbati Jul 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required because when running as daemon the underlying thread pull executor can be terminated before the shutdown hook. The problem is that starting from the shutdown hook we check if a thread executor is ready and if not ready we recreate.

I had a false positive (ready) when actually it was terminated.

I did not had a separate method as looking at the code around I thought that adding this check to the ready function made sense.

Totally open to that, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the explanation!

Hmm, looking at where the ready function is used, and the case you described: does adding the condition here actually solve the issue you mentioned?

i.e.: if the underlying thread pull executor is terminated before the shutdown hook is run, the ready function here would return false, so the logic in the App would recreate a new thread pool executor. Is that the desired behavior?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this. This is the expected behavior based on my investigation. The thread pool is recreated (to do some final work) and then once that work is done it will automatically exit.

I tested both in web apps (i.e. long running processes) and short lived CLI apps and this is how I could observe it behaves.

Anyway this ring a bell in you I can look better into this. Do you have a specific concern I can specifically look for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @olivielpeau, does my answer make sense to you? Please let me know

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks for the explanation, makes sense!

}

/**
Expand Down