-
Notifications
You must be signed in to change notification settings - Fork 814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[windows] new service and packaging #2417
Changes from 1 commit
8db437f
d04fbda
58d2751
1c449ff
f20306f
ef0a7c2
723c5df
4146514
22b5786
d6bc863
7506c39
fee135b
5840f8c
bdc971d
90cf16e
b24ef68
d0ca5f8
21fcfa0
c66ea26
befec72
df5c60b
f075565
34f551a
9c4fa7a
711f8bb
485b81c
bbe9119
f30225a
f003b19
bde7c15
ec26883
6f60943
4b1d0e2
369be52
bc82757
d0f4ca8
a1c2823
8d23019
c94b27b
e9e4eb2
5ad43e8
5fb54f2
b653adc
4f95aca
6f6cfb4
a8a2f91
35e8898
768519d
3a8eaed
dda342e
4ce664e
46fb184
63d6b21
5993b75
068c4b4
af04f0b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,170 +1,271 @@ | ||
""" | ||
A Windows Service wrapper for win32/windows_supervisor.py which consists in a minimalistic | ||
Supervisor for our agent with restart tries in case of failure. This program | ||
will be packaged into an .exe file with py2exe in our omnibus build. It doesn't | ||
have any project dependencies and shouldn't. It just launches and talks to our | ||
Windows Supervisor in our Python env. This way the agent code isn't shipped in | ||
an .exe file which allows for easy hacking on it's source code. | ||
|
||
Many thanks to the author of the article below which saved me quite some time: | ||
http://ryrobes.com/python/running-python-scripts-as-a-windows-service/ | ||
|
||
""" | ||
|
||
# set up logging before importing any other components | ||
from config import initialize_logging # noqa | ||
initialize_logging('supervisor') | ||
|
||
# stdlib | ||
import os | ||
import socket | ||
import select | ||
from collections import deque | ||
import logging | ||
|
||
# 3p | ||
import multiprocessing | ||
import os | ||
import psutil | ||
import win32event | ||
import win32service | ||
import signal | ||
import sys | ||
import time | ||
|
||
# win32 | ||
import win32api | ||
import win32serviceutil | ||
import servicemanager | ||
import win32service | ||
|
||
import ctypes | ||
from ctypes import wintypes, windll | ||
|
||
|
||
def _windows_commondata_path(): | ||
"""Return the common appdata path, using ctypes | ||
From http://stackoverflow.com/questions/626796/\ | ||
how-do-i-find-the-windows-common-application-data-folder-using-python | ||
""" | ||
CSIDL_COMMON_APPDATA = 35 | ||
# project | ||
from config import get_config | ||
from utils.jmx import JMXFiles | ||
|
||
_SHGetFolderPath = windll.shell32.SHGetFolderPathW | ||
_SHGetFolderPath.argtypes = [wintypes.HWND, | ||
ctypes.c_int, | ||
wintypes.HANDLE, | ||
wintypes.DWORD, wintypes.LPCWSTR] | ||
|
||
path_buf = wintypes.create_unicode_buffer(wintypes.MAX_PATH) | ||
_SHGetFolderPath(0, CSIDL_COMMON_APPDATA, 0, 0, path_buf) | ||
return path_buf.value | ||
log = logging.getLogger('supervisor') | ||
|
||
|
||
# Let's configure logging accordingly now (we need the above function for that) | ||
logging.basicConfig( | ||
filename=os.path.join(_windows_commondata_path(), 'Datadog', 'logs', 'service.log'), | ||
level=logging.DEBUG, | ||
format='%(asctime)s | %(levelname)s | %(name)s(%(filename)s:%(lineno)s) | %(message)s' | ||
) | ||
SERVICE_SLEEP_INTERVAL = 1 | ||
|
||
|
||
class AgentService(win32serviceutil.ServiceFramework): | ||
class AgentSvc(win32serviceutil.ServiceFramework): | ||
_svc_name_ = "DatadogAgent" | ||
_svc_display_name_ = "Datadog Agent" | ||
_svc_description_ = "Sends metrics to Datadog" | ||
|
||
# We use stock windows events to leverage windows capabilities to wait for | ||
# events to be triggered. It's a bit cleaner than a `while !self.stop_requested` | ||
# in our services SvcRun() loop :) | ||
# Oh and btw, h stands for "HANDLE", a common concept in the win32 C API | ||
h_wait_stop = win32event.CreateEvent(None, 0, 0, None) | ||
|
||
def __init__(self, args): | ||
|
||
win32serviceutil.ServiceFramework.__init__(self, args) | ||
|
||
current_dir = os.path.dirname(os.path.realpath(__file__)) | ||
# py2exe package | ||
# current_dir should be somthing like | ||
# C:\Program Files(x86)\Datadog\Datadog Agent\dist\library.zip\win32s | ||
self.agent_path = os.path.join(current_dir, '..', '..', '..', 'agent') | ||
|
||
self.agent_path = os.path.normpath(self.agent_path) | ||
logging.debug("Agent path: {0}".format(self.agent_path)) | ||
self.proc = None | ||
# Watch JMXFetch restarts | ||
self._MAX_JMXFETCH_RESTARTS = 3 | ||
self._agent_supervisor = AgentSupervisor() | ||
|
||
def SvcStop(self): | ||
""" Called when Windows wants to stop the service """ | ||
# Not even started | ||
if self.proc is None: | ||
logging.info('Supervisor was not yet started, stopping now.') | ||
# Started, and died | ||
elif not self.proc.is_running(): | ||
logging.info('Supervisor already exited. Some processes may still be alive. Stopping now.') | ||
# Still running | ||
else: | ||
logging.info("Stopping Supervisor...") | ||
# Soft termination based on TCP sockets to handle communication between the service | ||
# layer and the Windows Supervisor | ||
supervisor_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
supervisor_sock.connect(('localhost', 9001)) | ||
|
||
supervisor_sock.send("die".encode()) | ||
|
||
rlist, wlist, xlist = select.select([supervisor_sock], [], [], 15) | ||
if not rlist: | ||
# Ok some processes didn't want to die apparently, let's take care og them the hard | ||
# way ! | ||
logging.warning("Some processes are still alive. Killing them.") | ||
parent = psutil.Process(self.proc.pid) | ||
children = parent.children(recursive=True) | ||
|
||
for p in [parent] + children: | ||
p.kill() | ||
# Stop all services. | ||
self.running = False | ||
log.info('Stopping service...') | ||
self._agent_supervisor.stop() | ||
|
||
logging.info("Supervisor and its children processes exited, stopping now.") | ||
|
||
# We can sleep quietly now | ||
win32event.SetEvent(self.h_wait_stop) | ||
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING) | ||
|
||
def SvcDoRun(self): | ||
log.info('Starting service...') | ||
servicemanager.LogMsg( | ||
servicemanager.EVENTLOG_INFORMATION_TYPE, | ||
servicemanager.PYS_SERVICE_STARTED, | ||
(self._svc_name_, '') | ||
) | ||
self.start_ts = time.time() | ||
|
||
# Let's start our components and put our Supervisor's output into the | ||
# appropriate log file. This program also logs a minimalistic set of | ||
# lines in the same supervisor.log. | ||
self._agent_supervisor.run() | ||
|
||
logging.info("Starting Supervisor.") | ||
servicemanager.LogMsg( | ||
servicemanager.EVENTLOG_INFORMATION_TYPE, | ||
servicemanager.PYS_SERVICE_STOPPED, | ||
(self._svc_name_, '') | ||
) | ||
log.info("Service stopped.") | ||
|
||
|
||
class AgentSupervisor(object): | ||
devnull = None | ||
|
||
def __init__(self): | ||
AgentSupervisor.devnull = open(os.devnull, 'w') | ||
|
||
config = get_config(parse_args=False) | ||
|
||
# Let's have an uptime counter | ||
self.start_ts = None | ||
|
||
# Watch JMXFetch restarts | ||
self._MAX_JMXFETCH_RESTARTS = 3 | ||
self._count_jmxfetch_restarts = 0 | ||
|
||
# This file is somewhere in the dist directory of the agent | ||
file_dir = os.path.dirname(os.path.realpath(__file__)) | ||
search_dir, current_dir = os.path.split(file_dir) | ||
# So we go all the way up to the dist directory to find the actual agent dir | ||
while current_dir and current_dir != 'dist': | ||
search_dir, current_dir = os.path.split(search_dir) | ||
|
||
dd_dir = search_dir | ||
# If we don't find it, we use the default | ||
if not current_dir: | ||
dd_dir = os.path.join('C:\\', 'Program Files', 'Datadog', 'Datadog Agent') | ||
|
||
# cd to C:\Program Files\Datadog\Datadog Agent\agent | ||
os.chdir(os.path.join(dd_dir, 'agent')) | ||
|
||
# preparing a clean env for the agent processes | ||
env = os.environ.copy() | ||
if env.get('PYTHONPATH'): | ||
del env['PYTHONPATH'] | ||
if env.get('PYTHONHOME'): | ||
del env['PYTHONHOME'] | ||
if env['PATH'][-1] != ';': | ||
env['PATH'] += ';' | ||
log.info('env: %s', env) | ||
env['PATH'] += "{};{};".format(os.path.join(dd_dir, 'bin'), os.path.join(dd_dir, 'embedded')) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we also add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's remove it, we don't need this info in gohai anyway There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually you can disregard my comment: we do the same path handling on Linux so let's do the same thing here |
||
|
||
|
||
embedded_python = os.path.join(dd_dir, 'embedded', 'python.exe') | ||
# Keep a list of running processes so we can start/end as needed. | ||
# Processes will start started in order and stopped in reverse order. | ||
self.procs = { | ||
'forwarder': DDProcess( | ||
"forwarder", | ||
[embedded_python, "ddagent.py"], | ||
env | ||
), | ||
'collector': DDProcess( | ||
"collector", | ||
[embedded_python, "agent.py", "foreground", "--use-local-forwarder"], | ||
env | ||
), | ||
'dogstatsd': DDProcess( | ||
"dogstatsd", | ||
[embedded_python, "dogstatsd.py", "--use-local-forwarder"], | ||
env, | ||
enabled=config.get("use_dogstatsd", True) | ||
), | ||
'jmxfetch': DDProcess( | ||
"jmxfetch", | ||
[embedded_python, "jmxfetch.py"], | ||
env, | ||
max_restarts=self._MAX_JMXFETCH_RESTARTS | ||
), | ||
} | ||
|
||
def stop(self): | ||
# Stop all services. | ||
log.info("Stopping the agent processes...") | ||
self.running = False | ||
for proc in self.procs.values(): | ||
proc.terminate() | ||
AgentSupervisor.devnull.close() | ||
|
||
# Let's log the uptime | ||
if self.start_ts is None: | ||
self.start_ts = time.time() | ||
time.sleep(SERVICE_SLEEP_INTERVAL*2) | ||
secs = int(time.time()-self.start_ts) | ||
mins = int(secs/60) | ||
hours = int(secs/3600) | ||
log.info("Agent processes stopped.") | ||
log.info("Uptime: {0} hours {1} minutes {2} seconds".format(hours, mins % 60, secs % 60)) | ||
|
||
def run(self): | ||
self.start_ts = time.time() | ||
|
||
# Start all services. | ||
for proc in self.procs.values(): | ||
proc.start() | ||
|
||
# Loop to keep the service running since all DD services are | ||
# running in separate processes | ||
self.running = True | ||
while self.running: | ||
# Restart any processes that might have died. | ||
for name, proc in self.procs.iteritems(): | ||
if not proc.is_alive() and proc.is_enabled(): | ||
log.warning("%s has died. Restarting..." % name) | ||
proc.restart() | ||
|
||
time.sleep(SERVICE_SLEEP_INTERVAL) | ||
|
||
|
||
class DDProcess(object): | ||
""" | ||
Starts and monitors a Datadog process. | ||
Restarts when it exits until the limit set is reached. | ||
""" | ||
DEFAULT_MAX_RESTARTS = 5 | ||
_RESTART_TIMEFRAME = 3600 | ||
|
||
def __init__(self, name, command, env, enabled=True, max_restarts=None): | ||
self._name = name | ||
self._command = command | ||
self._env = env.copy() | ||
self._enabled = enabled | ||
self._proc = None | ||
self._restarts = deque([]) | ||
self._max_restarts = max_restarts or self.DEFAULT_MAX_RESTARTS | ||
|
||
def start(self): | ||
if self.is_enabled(): | ||
log.info("Starting %s.", self._name) | ||
self._proc = psutil.Popen(self._command, stdout=AgentSupervisor.devnull, stderr=AgentSupervisor.devnull, env=self._env) | ||
else: | ||
log.info("%s is not enabled, not starting it.", self._name) | ||
|
||
# Since we don't call terminate here, the execution of the supervisord | ||
# will be performed in a non blocking way. If an error is triggered | ||
# here, tell windows we're closing the service and report accordingly | ||
try: | ||
logging.debug('Changing working directory to "%s".', self.agent_path) | ||
os.chdir(self.agent_path) | ||
def stop(self): | ||
if self._proc is not None and self._proc.is_running(): | ||
log.info("Stopping %s...", self._name) | ||
self._proc.terminate() | ||
|
||
# This allows us to use the system's Python in case there is no embedded python | ||
embedded_python = os.path.normpath( | ||
os.path.join(self.agent_path, '..', 'embedded', 'python.exe') | ||
) | ||
if not os.path.isfile(embedded_python): | ||
embedded_python = "python" | ||
psutil.wait_procs([self._proc], timeout=3) | ||
|
||
if self._proc.is_running(): | ||
log.debug("%s didn't exit. Killing it.", self._name) | ||
self._proc.kill() | ||
|
||
log.info("%s is stopped.", self._name) | ||
else: | ||
log.info('%s was not running.', self._name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't seem useful. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Switched it to debug. I think it's still useful because it tells us the service didn't find the process. (more obvious than an absence of log) |
||
|
||
def terminate(self): | ||
self.stop() | ||
|
||
def is_alive(self): | ||
return self._proc is not None and self._proc.is_running() | ||
|
||
def is_enabled(self): | ||
return self._enabled | ||
|
||
self.proc = psutil.Popen([embedded_python, "windows_supervisor.py", "start", "server"]) | ||
except Exception: | ||
logging.exception("Error when launching Supervisor") | ||
self.SvcStop() | ||
def _can_restart(self): | ||
now = time.time() | ||
while(self._restarts and self._restarts[0] < now - self._RESTART_TIMEFRAME): | ||
self._restarts.popleft() | ||
|
||
return len(self._restarts) < self._max_restarts | ||
|
||
def restart(self): | ||
if not self._can_restart(): | ||
log.error( | ||
"{0} reached the limit of restarts ({1} tries during the last {2}s" | ||
" (max authorized: {3})). Not restarting." | ||
.format(self._name, len(self._restarts), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. --> there are multiple occurrences of this. |
||
self._RESTART_TIMEFRAME, self._max_restarts) | ||
) | ||
self._enabled = False | ||
return | ||
|
||
logging.info("Supervisor started.") | ||
self._restarts.append(time.time()) | ||
|
||
# Let's wait for our user to send a sigkill. We can't have RunSvc exit | ||
# before we actually kill our subprocess (the while True is just a | ||
# paranoia check in case win32event.INFINITE isn't really... infinite) | ||
while True: | ||
rc = win32event.WaitForSingleObject(self.h_wait_stop, win32event.INFINITE) | ||
if rc == win32event.WAIT_OBJECT_0: | ||
logging.info("Service stop requested.") | ||
break | ||
if self.is_alive(): | ||
self.stop() | ||
|
||
servicemanager.LogMsg( | ||
servicemanager.EVENTLOG_INFORMATION_TYPE, | ||
servicemanager.PYS_SERVICE_STOPPED, | ||
(self._svc_name_, '') | ||
) | ||
logging.info("Service stopped.") | ||
self.start() | ||
|
||
|
||
class JMXFetchProcess(DDProcess): | ||
def start(self): | ||
if self.is_enabled(): | ||
JMXFiles.clean_exit_file() | ||
super(JMXFetchProcess, self).start() | ||
|
||
def stop(self): | ||
""" | ||
Override `stop` method to properly exit JMXFetch. | ||
""" | ||
if self._proc is not None and self._proc.is_running(): | ||
JMXFiles.write_exit_file() | ||
super(JMXFetchProcess, self).stop() | ||
|
||
|
||
if __name__ == '__main__': | ||
# handle install, start, stop and uninstall | ||
win32serviceutil.HandleCommandLine(AgentService) | ||
multiprocessing.freeze_support() | ||
win32serviceutil.HandleCommandLine(AgentSvc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a similar message for
SvcStop
?