Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service_discovery][jmx] trying to pick-up JMX changes with SD. #3010

Merged
merged 7 commits into from
Dec 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ embedded/*
dump.rdb
tests/core/fixtures/flare/dd*
.python-version
.ropeproject
.bundle
tags
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ Style/Documentation:
# Configuration parameters: Methods.
Style/SingleLineBlockParams:
Enabled: false

BlockLength:
Max: 110
102 changes: 100 additions & 2 deletions agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import signal
import sys
import time
import supervisor.xmlrpc
import xmlrpclib
from copy import copy

# For pickle & PID files, see issue 293
Expand All @@ -29,13 +31,19 @@
from checks.collector import Collector
from config import (
get_config,
get_jmx_pipe_path,
get_parsed_args,
get_system_stats,
load_check_directory,
load_check
load_check,
generate_jmx_configs,
_is_affirmative,
SD_PIPE_NAME

)
from daemon import AgentSupervisor, Daemon
from emitter import http_emitter
from utils.platform import Platform

# utils
from util import Watchdog
Expand All @@ -50,11 +58,18 @@
from utils.service_discovery.sd_backend import get_sd_backend

# Constants
from jmxfetch import JMX_CHECKS
PID_NAME = "dd-agent"
PID_DIR = None
WATCHDOG_MULTIPLIER = 10
RESTART_INTERVAL = 4 * 24 * 60 * 60 # Defaults to 4 days

JMX_SUPERVISOR_ENTRY = 'datadog-agent:jmxfetch'
JMX_GRACE_SECS = 2
SERVICE_DISCOVERY_PREFIX = 'SD-'
SD_CONFIG_SEP = "#### SERVICE-DISCOVERY ####\n"

DEFAULT_SUPERVISOR_SOCKET = '/opt/datadog-agent/run/datadog-supervisor.sock'
DEFAULT_COLLECTOR_PROFILE_INTERVAL = 20

# Globals
Expand All @@ -79,6 +94,9 @@ def __init__(self, pidfile, autorestart, start_event=True, in_developer_mode=Fal
# this flag can be set to True, False, or a list of checks (for partial reload)
self.reload_configs_flag = False
self.sd_backend = None
self.supervisor_proxy = None
self.sd_pipe = None


def _handle_sigterm(self, signum, frame):
"""Handles SIGTERM and SIGINT, which gracefully stops the agent."""
Expand All @@ -104,6 +122,7 @@ def reload_configs(self, checks_to_reload=set()):
Can also reload only an explicit set of checks."""
log.info("Attempting a configuration reload...")
hostname = get_hostname(self._agentConfig)
jmx_sd_configs = None

# if no check was given, reload them all
if not checks_to_reload:
Expand All @@ -113,13 +132,23 @@ def reload_configs(self, checks_to_reload=set()):
check.stop()

self._checksd = load_check_directory(self._agentConfig, hostname)
if self._jmx_service_discovery_enabled:
jmx_sd_configs = generate_jmx_configs(self._agentConfig, hostname)
else:
new_checksd = copy(self._checksd)

self.refresh_specific_checks(hostname, new_checksd, checks_to_reload)
jmx_checks = [check for check in checks_to_reload if check in JMX_CHECKS]
py_checks = set(checks_to_reload) - set(jmx_checks)
self.refresh_specific_checks(hostname, new_checksd, py_checks)
if self._jmx_service_discovery_enabled:
jmx_sd_configs = generate_jmx_configs(self._agentConfig, hostname, jmx_checks)

# once the reload is done, replace existing checks with the new ones
self._checksd = new_checksd

if jmx_sd_configs:
self._submit_jmx_service_discovery(jmx_sd_configs)

# Logging
num_checks = len(self._checksd['initialized_checks'])
if num_checks > 0:
Expand Down Expand Up @@ -216,9 +245,32 @@ def run(self, config=None):
if self._agentConfig.get('service_discovery'):
self.sd_backend = get_sd_backend(self._agentConfig)

if _is_affirmative(self._agentConfig.get('sd_jmx_enable')):
pipe_path = get_jmx_pipe_path()
if Platform.is_windows():
pipe_name = pipe_path.format(pipename=SD_PIPE_NAME)
else:
pipe_name = os.path.join(pipe_path, SD_PIPE_NAME)

if os.access(pipe_path, os.W_OK):
if not os.path.exists(pipe_name):
os.mkfifo(pipe_name)
self.sd_pipe = os.open(pipe_name, os.O_RDWR) # RW to avoid blocking (will only W)

# Initialize Supervisor proxy
self.supervisor_proxy = self._get_supervisor_socket(self._agentConfig)
else:
log.debug('Unable to create pipe in temporary directory. JMX service discovery disabled.')

# Load the checks.d checks
self._checksd = load_check_directory(self._agentConfig, hostname)

# Load JMX configs if available
if self._jmx_service_discovery_enabled:
jmx_sd_configs = generate_jmx_configs(self._agentConfig, hostname)
if jmx_sd_configs:
self._submit_jmx_service_discovery(jmx_sd_configs)

# Initialize the Collector
self.collector = Collector(self._agentConfig, emitters, systemStats, hostname)

Expand Down Expand Up @@ -342,6 +394,52 @@ def _set_agent_config_hostname(self, agentConfig):
log.info('Not running on EC2, using hostname to identify this server')
return agentConfig

def _get_supervisor_socket(self, agentConfig):
if Platform.is_windows():
return None

sockfile = agentConfig.get('supervisor_socket', DEFAULT_SUPERVISOR_SOCKET)
supervisor_proxy = xmlrpclib.ServerProxy(
'http://127.0.0.1',
transport=supervisor.xmlrpc.SupervisorTransport(
None, None, serverurl="unix://{socket}".format(socket=sockfile))
)

return supervisor_proxy

@property
def _jmx_service_discovery_enabled(self):
return self.sd_pipe is not None

def _submit_jmx_service_discovery(self, jmx_sd_configs):

if not jmx_sd_configs or not self.sd_pipe:
return

if self.supervisor_proxy is not None:
jmx_state = self.supervisor_proxy.supervisor.getProcessInfo(JMX_SUPERVISOR_ENTRY)
log.debug("Current JMX check state: %s", jmx_state['statename'])
# restart jmx if stopped
if jmx_state['statename'] in ['STOPPED', 'EXITED', 'FATAL'] and self._agentConfig.get('sd_jmx_enable'):
self.supervisor_proxy.supervisor.startProcess(JMX_SUPERVISOR_ENTRY)
time.sleep(JMX_GRACE_SECS)
else:
log.debug("Unable to automatically start jmxfetch on Windows via supervisor.")

buffer = ""
for name, yaml in jmx_sd_configs.iteritems():
try:
buffer += SD_CONFIG_SEP
buffer += "# {}\n".format(name)
buffer += yaml
except Exception as e:
log.exception("unable to submit YAML via RPC: %s", e)
else:
log.info("JMX SD Config via named pip %s successfully.", name)

if buffer:
os.write(self.sd_pipe, buffer)

def _should_restart(self):
if time.time() - self.agent_start > self.restart_interval:
return True
Expand Down
57 changes: 53 additions & 4 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from urlparse import urlparse

# project
from util import check_yaml
from util import check_yaml, config_to_yaml
from utils.platform import Platform, get_os
from utils.proxy import get_proxy
from utils.service_discovery.config import extract_agent_config
Expand All @@ -42,6 +42,9 @@
DEFAULT_CHECK_FREQUENCY = 15 # seconds
LOGGING_MAX_BYTES = 10 * 1024 * 1024
SDK_INTEGRATIONS_DIR = 'integrations'
SD_PIPE_NAME = "dd-service_discovery"
SD_PIPE_UNIX_PATH = '/opt/datadog-agent/run'
SD_PIPE_WIN_PATH = "\\\\.\\pipe\\{pipename}"

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -72,6 +75,8 @@
"app.datad0g.com",
]

JMX_SD_CONF_TEMPLATE = '.jmx.{}.yaml'


class PathNotFound(Exception):
pass
Expand Down Expand Up @@ -376,7 +381,6 @@ def get_config(parse_args=True, cfg_path=None, options=None):
if options is not None and options.profile:
agentConfig['developer_mode'] = True

#
# Core config
#ap
if not config.has_option('Main', 'api_key'):
Expand Down Expand Up @@ -743,6 +747,16 @@ def get_sdk_integrations_path(osname=None):
return path
raise PathNotFound(path)

def get_jmx_pipe_path():
if Platform.is_windows():
pipe_path = SD_PIPE_WIN_PATH
else:
pipe_path = SD_PIPE_UNIX_PATH
if not os.path.isdir(pipe_path):
pipe_path = '/tmp'

return pipe_path


def get_auto_confd_path(osname=None):
"""Used for service discovery which only works for Unix"""
Expand Down Expand Up @@ -873,6 +887,7 @@ def _service_disco_configs(agentConfig):
service_disco_configs = sd_backend.get_configs()
except Exception:
log.exception("Loading service discovery configurations failed.")
return {}
else:
service_disco_configs = {}

Expand Down Expand Up @@ -997,6 +1012,7 @@ def load_check_directory(agentConfig, hostname):
initialize. Only checks that have a configuration
file in conf.d will be returned. '''
from checks import AGENT_METRICS_CHECK_NAME
from jmxfetch import JMX_CHECKS

initialized_checks = {}
init_failed_checks = {}
Expand Down Expand Up @@ -1035,7 +1051,9 @@ def load_check_directory(agentConfig, hostname):

for check_name, service_disco_check_config in _service_disco_configs(agentConfig).iteritems():
# ignore this config from service disco if the check has been loaded through a file config
if check_name in initialized_checks or check_name in init_failed_checks:
if check_name in initialized_checks or \
check_name in init_failed_checks or \
check_name in JMX_CHECKS:
continue

sd_init_config, sd_instances = service_disco_check_config[1]
Expand Down Expand Up @@ -1065,12 +1083,14 @@ def load_check_directory(agentConfig, hostname):

def load_check(agentConfig, hostname, checkname):
"""Same logic as load_check_directory except it loads one specific check"""
from jmxfetch import JMX_CHECKS

agentConfig['checksd_hostname'] = hostname
osname = get_os()
checks_places = get_checks_places(osname, agentConfig)
for config_path in _file_configs_paths(osname, agentConfig):
check_name = _conf_path_to_check_name(config_path)
if check_name == checkname:
if check_name == checkname and check_name not in JMX_CHECKS:
conf_is_valid, check_config, invalid_check = _load_file_config(config_path, check_name, agentConfig)

if invalid_check and not conf_is_valid:
Expand All @@ -1092,6 +1112,35 @@ def load_check(agentConfig, hostname, checkname):

return None

def generate_jmx_configs(agentConfig, hostname, checknames=None):
"""Similar logic to load_check_directory for JMX checks"""
from jmxfetch import JMX_CHECKS

if not checknames:
checknames = JMX_CHECKS
agentConfig['checksd_hostname'] = hostname

# the check was not found, try with service discovery
generated = {}
for check_name, service_disco_check_config in _service_disco_configs(agentConfig).iteritems():
if check_name in checknames and check_name in JMX_CHECKS:
log.debug('Generating JMX config for: %s' % check_name)

sd_init_config, sd_instances = service_disco_check_config

check_config = {'init_config': sd_init_config,
'instances': sd_instances}

try:
yaml = config_to_yaml(check_config)
# generated["{}_{}".format(check_name, idx)] = yaml
generated["{}_{}".format(check_name, 0)] = yaml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the _0 for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was actually a product of an original misunderstanding on my part. But then I saw some utility for it (although I have to take another look at this code) - for example simultaneous containers for the same service, but with different tags or whatever, that could have different configurations. On the JMX side we'd use the check name as the key in the map, so this would allow us to support that in the future, that's why we have the commented line on the top. We can discuss and possibly remove.

log.debug("YAML generated: %s", yaml)
except Exception:
log.exception("Unable to generate YAML config for %s", check_name)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except:
    log.exception("Unable to generate YAML config for %s", check_name)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

return generated

#
# logging

Expand Down
3 changes: 3 additions & 0 deletions datadog.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ gce_updated_hostname: yes
# and modify its value.
# sd_template_dir: /datadog/check_configs
#
# Enable JMX checks for service disocvery
# sd_jmx_enable: no
#
# ========================================================================== #
# Other #
# ========================================================================== #
Expand Down
21 changes: 16 additions & 5 deletions jmxfetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
DEFAULT_CHECK_FREQUENCY,
get_confd_path,
get_config,
get_jmx_pipe_path,
get_logging_config,
PathNotFound,
_is_affirmative
)
from util import yLoader
from utils.jmx import JMX_FETCH_JAR_NAME, JMXFiles
Expand All @@ -46,6 +48,7 @@
}

_JVM_DEFAULT_MAX_MEMORY_ALLOCATION = " -Xmx200m"
_JVM_DEFAULT_SD_MAX_MEMORY_ALLOCATION = " -Xmx512m"
_JVM_DEFAULT_INITIAL_MEMORY_ALLOCATION = " -Xms50m"
JMXFETCH_MAIN_CLASS = "org.datadog.jmxfetch.App"
JMX_CHECKS = [
Expand Down Expand Up @@ -81,6 +84,7 @@ def __init__(self, confd_path, agentConfig):
self.agentConfig = agentConfig
self.logging_config = get_logging_config()
self.check_frequency = DEFAULT_CHECK_FREQUENCY
self.service_discovery = _is_affirmative(self.agentConfig.get('sd_jmx_enable', False))

self.jmx_process = None
self.jmx_checks = None
Expand Down Expand Up @@ -148,7 +152,7 @@ def run(self, command=None, checks_list=None, reporter=None, redirect_std_stream
except Exception:
log.exception("Error while writing JMX status file")

if len(self.jmx_checks) > 0:
if len(self.jmx_checks) > 0 or self.service_discovery:
return self._start(self.java_bin_path, self.java_options, self.jmx_checks,
command, reporter, self.tools_jar_path, self.custom_jar_paths, redirect_std_streams)
else:
Expand Down Expand Up @@ -273,13 +277,20 @@ def _start(self, path_to_java, java_run_opts, jmx_checks, command, reporter, too
subprocess_args.insert(len(subprocess_args) - 1, '--exit_file_location')
subprocess_args.insert(len(subprocess_args) - 1, path_to_exit_file)

subprocess_args.insert(4, '--check')
for check in jmx_checks:
subprocess_args.insert(5, check)
if self.service_discovery:
pipe_path = get_jmx_pipe_path()
subprocess_args.insert(4, '--tmp_directory')
subprocess_args.insert(5, pipe_path)
subprocess_args.insert(4, '--sd_standby')

if jmx_checks:
subprocess_args.insert(4, '--check')
for check in jmx_checks:
subprocess_args.insert(5, check)

# Specify a maximum memory allocation pool for the JVM
if "Xmx" not in java_run_opts and "XX:MaxHeapSize" not in java_run_opts:
java_run_opts += _JVM_DEFAULT_MAX_MEMORY_ALLOCATION
java_run_opts += _JVM_DEFAULT_SD_MAX_MEMORY_ALLOCATION if self.service_discovery else _JVM_DEFAULT_MAX_MEMORY_ALLOCATION
# Specify the initial memory allocation pool for the JVM
if "Xms" not in java_run_opts and "XX:InitialHeapSize" not in java_run_opts:
java_run_opts += _JVM_DEFAULT_INITIAL_MEMORY_ALLOCATION
Expand Down
Loading