Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[windows] new service and packaging #2417

Merged
merged 56 commits into from
Feb 13, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
8db437f
[windows] Refactor Windows service layer
Aug 10, 2015
d04fbda
Created a process-based supervisor on windows
Aug 19, 2015
58d2751
[flare][windows] Add new windows logs to flare
Aug 20, 2015
1c449ff
[windows] Use the path of the final package
Aug 21, 2015
f20306f
[windows] Use sockets in our supervisor + path fix
Sep 4, 2015
ef0a7c2
[windows] Create a Thread-based watchdog
Sep 8, 2015
723c5df
[windows] Use same path as on Linux for jmxfetch
Sep 8, 2015
4146514
[windows] Removed embedded python shell + cleanup
Sep 16, 2015
22b5786
[windows] Code cleanup to be flake8 compliant
Sep 16, 2015
d6bc863
[windows] Bump down pywin32 version to 217
Sep 16, 2015
7506c39
[windows] Fixed service layer once and for all
Sep 16, 2015
fee135b
[windows] Better logging in service layer
Sep 16, 2015
5840f8c
[windows] Accurate error messages in supervisor
Sep 16, 2015
bdc971d
[windows] Add new logs to GUI and flare
Sep 16, 2015
90cf16e
Don't fetch service and supervisor logs on OSX
Sep 16, 2015
b24ef68
[windows] Add new logs to windows GUI
Sep 17, 2015
d0ca5f8
[windows] Add service log file to the menu
Sep 17, 2015
21fcfa0
[core] Different Watchdog classes for OS
degemer Feb 22, 2016
c66ea26
[agent] remove SIGHUP signal register on windows
degemer Mar 1, 2016
befec72
[watchdog] catch ImportError of resource on Windows
degemer Mar 1, 2016
df5c60b
[windows] add logfile for GUI
degemer Mar 1, 2016
f075565
[windows] put gohai and python in path for Popen
degemer Mar 1, 2016
34f551a
[windows] remove Guidata compatibility warnings
degemer Mar 13, 2016
9c4fa7a
[windows] rename service and supervisor
degemer Apr 1, 2016
711f8bb
[util] flake8 :fire:
degemer May 10, 2016
485b81c
Merge branch 'master' into quentin/windows-omnibus-5.0
degemer May 23, 2016
bbe9119
[windows] properly configure checksd path
degemer Jun 3, 2016
f30225a
Merge branch 'master' into quentin/windows-omnibus-5.0
degemer Jun 7, 2016
f003b19
[windows] query Registry if api_key is default
degemer Jun 28, 2016
bde7c15
Merge branch 'master' into quentin/windows-omnibus-5.0
degemer Jul 28, 2016
ec26883
[windows] 💚 CI
degemer Jul 28, 2016
6f60943
[windows] Windows Service rework
degemer Aug 2, 2016
4b1d0e2
Revert "[windows] Windows Service rework"
degemer Aug 4, 2016
369be52
Merge branch 'master' into quentin/windows-omnibus-5.0
degemer Nov 15, 2016
bc82757
fix flake8 errors
degemer Nov 16, 2016
d0f4ca8
Merge branch 'master' into quentin/windows-omnibus-5.0
degemer Dec 21, 2016
a1c2823
[config] do not fail _windows_extra_checksd_path
degemer Dec 21, 2016
8d23019
[agent] catch supervisor.xmlrpc import errors
degemer Jan 3, 2017
c94b27b
[windows] refactor Watchdog and config
degemer Jan 10, 2017
e9e4eb2
[windows] create new service
degemer Jan 11, 2017
5ad43e8
[windows] merge service and supervisor
degemer Jan 12, 2017
5fb54f2
[windows] remove files from previous build system
degemer Jan 13, 2017
b653adc
[windows] do not run jmxfetch if not enabled
degemer Jan 13, 2017
4f95aca
Merge branch 'master' into quentin/windows-omnibus-5.0
degemer Jan 13, 2017
6f6cfb4
[windows] remove test of old service
degemer Jan 13, 2017
a8a2f91
[windows] let the service handle the conf update
degemer Jan 18, 2017
35e8898
[agent] remove unused can_write_conf arg
degemer Jan 19, 2017
768519d
[windows] add more logs for the config write
degemer Jan 19, 2017
3a8eaed
[windows] fix config write
degemer Jan 19, 2017
dda342e
[windows] address review comments
degemer Jan 25, 2017
4ce664e
[windows] read from registry if default apikey
degemer Jan 25, 2017
46fb184
[windows] re-add shell.py
degemer Jan 25, 2017
63d6b21
[windows] Call DDProcess functions in JMXFetchP
degemer Feb 3, 2017
5993b75
[windows] address review comments
degemer Feb 7, 2017
068c4b4
Merge branch 'master' into quentin/windows-omnibus-5.0
degemer Feb 7, 2017
af04f0b
[service] properly exit or kill JMXFetch
degemer Feb 7, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
357 changes: 229 additions & 128 deletions win32/service.py
Original file line number Diff line number Diff line change
@@ -1,170 +1,271 @@
"""
A Windows Service wrapper for win32/windows_supervisor.py which consists in a minimalistic
Supervisor for our agent with restart tries in case of failure. This program
will be packaged into an .exe file with py2exe in our omnibus build. It doesn't
have any project dependencies and shouldn't. It just launches and talks to our
Windows Supervisor in our Python env. This way the agent code isn't shipped in
an .exe file which allows for easy hacking on it's source code.

Many thanks to the author of the article below which saved me quite some time:
http://ryrobes.com/python/running-python-scripts-as-a-windows-service/

"""

# set up logging before importing any other components
from config import initialize_logging # noqa
initialize_logging('supervisor')

# stdlib
import os
import socket
import select
from collections import deque
import logging

# 3p
import multiprocessing
import os
import psutil
import win32event
import win32service
import signal
import sys
import time

# win32
import win32api
import win32serviceutil
import servicemanager
import win32service

import ctypes
from ctypes import wintypes, windll


def _windows_commondata_path():
"""Return the common appdata path, using ctypes
From http://stackoverflow.com/questions/626796/\
how-do-i-find-the-windows-common-application-data-folder-using-python
"""
CSIDL_COMMON_APPDATA = 35
# project
from config import get_config
from utils.jmx import JMXFiles

_SHGetFolderPath = windll.shell32.SHGetFolderPathW
_SHGetFolderPath.argtypes = [wintypes.HWND,
ctypes.c_int,
wintypes.HANDLE,
wintypes.DWORD, wintypes.LPCWSTR]

path_buf = wintypes.create_unicode_buffer(wintypes.MAX_PATH)
_SHGetFolderPath(0, CSIDL_COMMON_APPDATA, 0, 0, path_buf)
return path_buf.value
log = logging.getLogger('supervisor')


# Let's configure logging accordingly now (we need the above function for that)
logging.basicConfig(
filename=os.path.join(_windows_commondata_path(), 'Datadog', 'logs', 'service.log'),
level=logging.DEBUG,
format='%(asctime)s | %(levelname)s | %(name)s(%(filename)s:%(lineno)s) | %(message)s'
)
SERVICE_SLEEP_INTERVAL = 1


class AgentService(win32serviceutil.ServiceFramework):
class AgentSvc(win32serviceutil.ServiceFramework):
_svc_name_ = "DatadogAgent"
_svc_display_name_ = "Datadog Agent"
_svc_description_ = "Sends metrics to Datadog"

# We use stock windows events to leverage windows capabilities to wait for
# events to be triggered. It's a bit cleaner than a `while !self.stop_requested`
# in our services SvcRun() loop :)
# Oh and btw, h stands for "HANDLE", a common concept in the win32 C API
h_wait_stop = win32event.CreateEvent(None, 0, 0, None)

def __init__(self, args):

win32serviceutil.ServiceFramework.__init__(self, args)

current_dir = os.path.dirname(os.path.realpath(__file__))
# py2exe package
# current_dir should be somthing like
# C:\Program Files(x86)\Datadog\Datadog Agent\dist\library.zip\win32s
self.agent_path = os.path.join(current_dir, '..', '..', '..', 'agent')

self.agent_path = os.path.normpath(self.agent_path)
logging.debug("Agent path: {0}".format(self.agent_path))
self.proc = None
# Watch JMXFetch restarts
self._MAX_JMXFETCH_RESTARTS = 3
self._agent_supervisor = AgentSupervisor()

def SvcStop(self):
""" Called when Windows wants to stop the service """
# Not even started
if self.proc is None:
logging.info('Supervisor was not yet started, stopping now.')
# Started, and died
elif not self.proc.is_running():
logging.info('Supervisor already exited. Some processes may still be alive. Stopping now.')
# Still running
else:
logging.info("Stopping Supervisor...")
# Soft termination based on TCP sockets to handle communication between the service
# layer and the Windows Supervisor
supervisor_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
supervisor_sock.connect(('localhost', 9001))

supervisor_sock.send("die".encode())

rlist, wlist, xlist = select.select([supervisor_sock], [], [], 15)
if not rlist:
# Ok some processes didn't want to die apparently, let's take care og them the hard
# way !
logging.warning("Some processes are still alive. Killing them.")
parent = psutil.Process(self.proc.pid)
children = parent.children(recursive=True)

for p in [parent] + children:
p.kill()
# Stop all services.
self.running = False
log.info('Stopping service...')
self._agent_supervisor.stop()

logging.info("Supervisor and its children processes exited, stopping now.")

# We can sleep quietly now
win32event.SetEvent(self.h_wait_stop)
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)

def SvcDoRun(self):
log.info('Starting service...')
servicemanager.LogMsg(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a similar message for SvcStop ?

servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STARTED,
(self._svc_name_, '')
)
self.start_ts = time.time()

# Let's start our components and put our Supervisor's output into the
# appropriate log file. This program also logs a minimalistic set of
# lines in the same supervisor.log.
self._agent_supervisor.run()

logging.info("Starting Supervisor.")
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STOPPED,
(self._svc_name_, '')
)
log.info("Service stopped.")


class AgentSupervisor(object):
devnull = None

def __init__(self):
AgentSupervisor.devnull = open(os.devnull, 'w')

config = get_config(parse_args=False)

# Let's have an uptime counter
self.start_ts = None

# Watch JMXFetch restarts
self._MAX_JMXFETCH_RESTARTS = 3
self._count_jmxfetch_restarts = 0

# This file is somewhere in the dist directory of the agent
file_dir = os.path.dirname(os.path.realpath(__file__))
search_dir, current_dir = os.path.split(file_dir)
# So we go all the way up to the dist directory to find the actual agent dir
while current_dir and current_dir != 'dist':
search_dir, current_dir = os.path.split(search_dir)

dd_dir = search_dir
# If we don't find it, we use the default
if not current_dir:
dd_dir = os.path.join('C:\\', 'Program Files', 'Datadog', 'Datadog Agent')

# cd to C:\Program Files\Datadog\Datadog Agent\agent
os.chdir(os.path.join(dd_dir, 'agent'))

# preparing a clean env for the agent processes
env = os.environ.copy()
if env.get('PYTHONPATH'):
del env['PYTHONPATH']
if env.get('PYTHONHOME'):
del env['PYTHONHOME']
if env['PATH'][-1] != ';':
env['PATH'] += ';'
log.info('env: %s', env)
env['PATH'] += "{};{};".format(os.path.join(dd_dir, 'bin'), os.path.join(dd_dir, 'embedded'))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we also add embedded to the path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because gohai requires python to be in the path (to get the python version), and python.exe is in embedded.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove it, we don't need this info in gohai anyway

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually you can disregard my comment: we do the same path handling on Linux so let's do the same thing here



embedded_python = os.path.join(dd_dir, 'embedded', 'python.exe')
# Keep a list of running processes so we can start/end as needed.
# Processes will start started in order and stopped in reverse order.
self.procs = {
'forwarder': DDProcess(
"forwarder",
[embedded_python, "ddagent.py"],
env
),
'collector': DDProcess(
"collector",
[embedded_python, "agent.py", "foreground", "--use-local-forwarder"],
env
),
'dogstatsd': DDProcess(
"dogstatsd",
[embedded_python, "dogstatsd.py", "--use-local-forwarder"],
env,
enabled=config.get("use_dogstatsd", True)
),
'jmxfetch': DDProcess(
"jmxfetch",
[embedded_python, "jmxfetch.py"],
env,
max_restarts=self._MAX_JMXFETCH_RESTARTS
),
}

def stop(self):
# Stop all services.
log.info("Stopping the agent processes...")
self.running = False
for proc in self.procs.values():
proc.terminate()
AgentSupervisor.devnull.close()

# Let's log the uptime
if self.start_ts is None:
self.start_ts = time.time()
time.sleep(SERVICE_SLEEP_INTERVAL*2)
secs = int(time.time()-self.start_ts)
mins = int(secs/60)
hours = int(secs/3600)
log.info("Agent processes stopped.")
log.info("Uptime: {0} hours {1} minutes {2} seconds".format(hours, mins % 60, secs % 60))

def run(self):
self.start_ts = time.time()

# Start all services.
for proc in self.procs.values():
proc.start()

# Loop to keep the service running since all DD services are
# running in separate processes
self.running = True
while self.running:
# Restart any processes that might have died.
for name, proc in self.procs.iteritems():
if not proc.is_alive() and proc.is_enabled():
log.warning("%s has died. Restarting..." % name)
proc.restart()

time.sleep(SERVICE_SLEEP_INTERVAL)


class DDProcess(object):
"""
Starts and monitors a Datadog process.
Restarts when it exits until the limit set is reached.
"""
DEFAULT_MAX_RESTARTS = 5
_RESTART_TIMEFRAME = 3600

def __init__(self, name, command, env, enabled=True, max_restarts=None):
self._name = name
self._command = command
self._env = env.copy()
self._enabled = enabled
self._proc = None
self._restarts = deque([])
self._max_restarts = max_restarts or self.DEFAULT_MAX_RESTARTS

def start(self):
if self.is_enabled():
log.info("Starting %s.", self._name)
self._proc = psutil.Popen(self._command, stdout=AgentSupervisor.devnull, stderr=AgentSupervisor.devnull, env=self._env)
else:
log.info("%s is not enabled, not starting it.", self._name)

# Since we don't call terminate here, the execution of the supervisord
# will be performed in a non blocking way. If an error is triggered
# here, tell windows we're closing the service and report accordingly
try:
logging.debug('Changing working directory to "%s".', self.agent_path)
os.chdir(self.agent_path)
def stop(self):
if self._proc is not None and self._proc.is_running():
log.info("Stopping %s...", self._name)
self._proc.terminate()

# This allows us to use the system's Python in case there is no embedded python
embedded_python = os.path.normpath(
os.path.join(self.agent_path, '..', 'embedded', 'python.exe')
)
if not os.path.isfile(embedded_python):
embedded_python = "python"
psutil.wait_procs([self._proc], timeout=3)

if self._proc.is_running():
log.debug("%s didn't exit. Killing it.", self._name)
self._proc.kill()

log.info("%s is stopped.", self._name)
else:
log.info('%s was not running.', self._name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem useful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched it to debug. I think it's still useful because it tells us the service didn't find the process. (more obvious than an absence of log)


def terminate(self):
self.stop()

def is_alive(self):
return self._proc is not None and self._proc.is_running()

def is_enabled(self):
return self._enabled

self.proc = psutil.Popen([embedded_python, "windows_supervisor.py", "start", "server"])
except Exception:
logging.exception("Error when launching Supervisor")
self.SvcStop()
def _can_restart(self):
now = time.time()
while(self._restarts and self._restarts[0] < now - self._RESTART_TIMEFRAME):
self._restarts.popleft()

return len(self._restarts) < self._max_restarts

def restart(self):
if not self._can_restart():
log.error(
"{0} reached the limit of restarts ({1} tries during the last {2}s"
" (max authorized: {3})). Not restarting."
.format(self._name, len(self._restarts),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not use .format() here. (I wrote this line :p)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--> there are multiple occurrences of this.

self._RESTART_TIMEFRAME, self._max_restarts)
)
self._enabled = False
return

logging.info("Supervisor started.")
self._restarts.append(time.time())

# Let's wait for our user to send a sigkill. We can't have RunSvc exit
# before we actually kill our subprocess (the while True is just a
# paranoia check in case win32event.INFINITE isn't really... infinite)
while True:
rc = win32event.WaitForSingleObject(self.h_wait_stop, win32event.INFINITE)
if rc == win32event.WAIT_OBJECT_0:
logging.info("Service stop requested.")
break
if self.is_alive():
self.stop()

servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STOPPED,
(self._svc_name_, '')
)
logging.info("Service stopped.")
self.start()


class JMXFetchProcess(DDProcess):
def start(self):
if self.is_enabled():
JMXFiles.clean_exit_file()
super(JMXFetchProcess, self).start()

def stop(self):
"""
Override `stop` method to properly exit JMXFetch.
"""
if self._proc is not None and self._proc.is_running():
JMXFiles.write_exit_file()
super(JMXFetchProcess, self).stop()


if __name__ == '__main__':
# handle install, start, stop and uninstall
win32serviceutil.HandleCommandLine(AgentService)
multiprocessing.freeze_support()
win32serviceutil.HandleCommandLine(AgentSvc)
Loading