Skip to content

Commit

Permalink
Merge pull request #2322 from DataDog/yann/watchdog-frenesy-detection
Browse files Browse the repository at this point in the history
[core] detect forwarder pathological activity
  • Loading branch information
yannmh committed Apr 29, 2016
2 parents 305f9f0 + 0639b51 commit 3ae5f9d
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 14 deletions.
16 changes: 13 additions & 3 deletions ddagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,14 @@

DD_ENDPOINT = "dd_url"

# Transactions
TRANSACTION_FLUSH_INTERVAL = 5000 # Every 5 seconds

# Watchdog settings
WATCHDOG_INTERVAL_MULTIPLIER = 10 # 10x flush interval
WATCHDOG_HIGH_ACTIVITY_THRESHOLD = 1000 # Threshold to detect pathological activity

# Misc
HEADERS_TO_REMOVE = [
'Host',
'Content-Length',
Expand All @@ -79,7 +85,7 @@
# Maximum queue size in bytes (when this is reached, old messages are dropped)
MAX_QUEUE_SIZE = 30 * 1024 * 1024 # 30MB

THROTTLING_DELAY = timedelta(microseconds=1000000/2) # 2 msg/second
THROTTLING_DELAY = timedelta(microseconds=1000000 / 2) # 2 msg/second


class EmitterThread(threading.Thread):
Expand Down Expand Up @@ -404,10 +410,14 @@ def __init__(self, port, agentConfig, watchdog=True,
if self.skip_ssl_validation:
log.info("Skipping SSL hostname validation, useful when using a transparent proxy")

# Monitor activity
if watchdog:
watchdog_timeout = TRANSACTION_FLUSH_INTERVAL * WATCHDOG_INTERVAL_MULTIPLIER / 1000
self._watchdog = Watchdog(watchdog_timeout,
max_mem_mb=agentConfig.get('limit_memory_consumption', None))
self._watchdog = Watchdog(
watchdog_timeout,
max_mem_mb=agentConfig.get('limit_memory_consumption', None),
max_resets=WATCHDOG_HIGH_ACTIVITY_THRESHOLD
)

def log_request(self, handler):
""" Override the tornado logging method.
Expand Down
57 changes: 53 additions & 4 deletions tests/core/test_watchdog.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# stdlib
import os
from contextlib import contextmanager
from random import random, randrange
import os
import subprocess
import sys
import time
import unittest
import urllib as url

# 3p
from mock import patch
from nose.plugins.attrib import attr

# project
Expand All @@ -17,15 +19,62 @@
from util import Watchdog


class WatchdogKill(Exception):
"""
The watchdog attempted to kill the process.
"""
pass


@attr(requires='core_integration')
class TestWatchdog(unittest.TestCase):
"""Test watchdog in various conditions
"""

Test watchdog in various conditions
"""
JITTER_FACTOR = 2

@contextmanager
def set_time(self, time):
"""
Helper, a context manager to set the current time value.
"""
# Set the current time within `util` module
mock_time = patch("util.time.time")
mock_time.start().return_value = time

# Yield
yield

# Unset the time mock
mock_time.stop()

@patch.object(Watchdog, 'self_destruct', side_effect=WatchdogKill)
def test_watchdog_frenesy_detection(self, mock_restarted):
"""
Watchdog restarts the process on suspicious high activity.
"""
# Limit the restart timeframe for test purpose
Watchdog._RESTART_TIMEFRAME = 1

# Create a watchdog with a low activity tolerancy
process_watchdog = Watchdog(10, max_resets=3)
ping_watchdog = process_watchdog.reset

with self.set_time(1):
# Can be reset 3 times within the watchdog timeframe
for x in xrange(0, 3):
ping_watchdog()

# On the 4th attempt, the watchdog detects a suspicously high activity
self.assertRaises(WatchdogKill, ping_watchdog)

with self.set_time(3):
# Gets back to normal when the activity timeframe expires.
ping_watchdog()

def test_watchdog(self):
"""Verify that watchdog kills ourselves even when spinning
"""
Verify that watchdog kills ourselves even when spinning
Verify that watchdog kills ourselves when hanging
"""
start = time.time()
Expand Down
53 changes: 46 additions & 7 deletions util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Licensed under Simplified BSD License (see LICENSE)

# stdlib
from collections import deque
from hashlib import md5
import logging
import os
Expand Down Expand Up @@ -477,20 +478,27 @@ def get_instance_id(agentConfig):


class Watchdog(object):
"""Simple signal-based watchdog that will scuttle the current process
if it has not been reset every N seconds, or if the processes exceeds
a specified memory threshold.
"""
Simple signal-based watchdog. Restarts the process when:
* no reset was made for more than a specified duration
* (optional) a specified memory threshold is exceeded
* (optional) a suspicious high activity is detected, i.e. too many resets for a given timeframe.
**Warning**: Not thread-safe.
Can only be invoked once per process, so don't use with multiple threads.
If you instantiate more than one, you're also asking for trouble.
"""
def __init__(self, duration, max_mem_mb = None):
# Activity history timeframe
_RESTART_TIMEFRAME = 60

def __init__(self, duration, max_mem_mb=None, max_resets=None):
import resource

#Set the duration
# Set the duration
self._duration = int(duration)
signal.signal(signal.SIGALRM, Watchdog.self_destruct)

# cap memory usage
# Set memory usage threshold
if max_mem_mb is not None:
self._max_mem_kb = 1024 * max_mem_mb
max_mem_bytes = 1024 * self._max_mem_kb
Expand All @@ -499,23 +507,54 @@ def __init__(self, duration, max_mem_mb = None):
else:
self.memory_limit_enabled = False

# Set high activity monitoring
self._restarts = deque([])
self._max_resets = max_resets

@staticmethod
def self_destruct(signum, frame):
"""
Kill the process. It will be eventually restarted.
"""
try:
import traceback
log.error("Self-destructing...")
log.error(traceback.format_exc())
finally:
os.kill(os.getpid(), signal.SIGKILL)

def _is_frenetic(self):
"""
Detect suspicious high activity, i.e. the number of resets exceeds the maximum limit set
on the watchdog timeframe.
Flush old activity history
"""
now = time.time()
while(self._restarts and self._restarts[0] < now - self._RESTART_TIMEFRAME):
self._restarts.popleft()

return len(self._restarts) > self._max_resets

def reset(self):
# self destruct if using too much memory, as tornado will swallow MemoryErrors
"""
Reset the watchdog state, i.e.
* re-arm alarm signal
* (optional) check memory consumption
* (optional) save reset history, flush old entries and check frequency
"""
# Check memory consumption: restart if too high as tornado will swallow MemoryErrors
if self.memory_limit_enabled:
mem_usage_kb = int(os.popen('ps -p %d -o %s | tail -1' % (os.getpid(), 'rss')).read())
if mem_usage_kb > (0.95 * self._max_mem_kb):
Watchdog.self_destruct(signal.SIGKILL, sys._getframe(0))

# Check activity
if self._max_resets:
self._restarts.append(time.time())
if self._is_frenetic():
Watchdog.self_destruct(signal.SIGKILL, sys._getframe(0))

# Re arm alarm signal
log.debug("Resetting watchdog for %d" % self._duration)
signal.alarm(self._duration)

Expand Down

0 comments on commit 3ae5f9d

Please sign in to comment.