Skip to content

Commit

Permalink
worker: finish all opened tasks and reexec on SIGHUP
Browse files Browse the repository at this point in the history
This feature implements safe reloading of updated configuration or updated
source code, e.g. with `systemctl reload`.

The reexec won't be performed when SIGHUP is followed by SIGTERM, SIGINT
or explicit shutdown by the `ShutdownWorker` task.

The unit tests must use `mock.ANY` instead of the actual signal handler
because it is impossible to reference a nested closure outside of its own
scope.
  • Loading branch information
lzaoral committed Mar 15, 2024
1 parent 022ba22 commit 7245924
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 18 deletions.
33 changes: 25 additions & 8 deletions kobo/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,9 @@
set_except_hook()


def daemon_shutdown(*args, **kwargs):
raise ShutdownException()


def main_loop(conf, foreground=False, task_manager_class=None):
"""infinite daemon loop"""

# define custom signal handlers
signal.signal(signal.SIGTERM, daemon_shutdown)

# initialize TaskManager
try:
log_file = conf.get("LOG_FILE", None)
Expand All @@ -49,6 +42,22 @@ def main_loop(conf, foreground=False, task_manager_class=None):
if foreground and tm._logger is not None:
kobo.log.add_stderr_logger(tm._logger)

# define other signal handlers
def sigterm_handler(*_):
tm.reexec = False
raise ShutdownException()
signal.signal(signal.SIGTERM, sigterm_handler)

# reload the worker on SIGHUP
def sighup_handler(*_):
# do not accept new tasks
tm.lock()
tm.reexec = True
signal.signal(signal.SIGHUP, sighup_handler)

# reset SIGINT to default handler
signal.signal(signal.SIGINT, signal.default_int_handler)

while 1:
try:
tm.log_debug(80 * '-')
Expand All @@ -65,7 +74,11 @@ def main_loop(conf, foreground=False, task_manager_class=None):
# sleep for some time
tm.sleep()

except (ShutdownException, KeyboardInterrupt):
except (ShutdownException, KeyboardInterrupt) as e:
# do not reexec on SIGINT
if isinstance(e, KeyboardInterrupt):
tm.reexec = False

# ignore keyboard interrupts and sigterm
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
Expand All @@ -81,6 +94,10 @@ def main_loop(conf, foreground=False, task_manager_class=None):
tm.log_error(traceback.get_traceback())
tm.sleep()

if tm.reexec:
tm.log_info('Restarting: %s', sys.argv)
os.execvp(sys.argv[0], sys.argv)


def main(conf, argv=None, task_manager_class=None):
parser = optparse.OptionParser()
Expand Down
1 change: 1 addition & 0 deletions kobo/worker/taskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def __init__(self, conf, logger=None, **kwargs):
self.task_dict = {} # { task_id: { task information obtained from self.hub.get_worker_tasks() } }

self.locked = False # if task manager is locked, it waits until tasks finish and exits
self.reexec = False # if the worker should be restarted after it finishes

self.task_container = TaskContainer()

Expand Down
1 change: 1 addition & 0 deletions kobo/worker/tasks/task_shutdown_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ def run(self):

# lock the task manager and let it terminate all tasks
self.task_manager.locked = True
self.task_manager.reexec = False
27 changes: 17 additions & 10 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import six
import unittest

from mock import Mock, patch, call
from mock import ANY, Mock, patch, call

from kobo.exceptions import ShutdownException
from kobo.worker import main
Expand All @@ -25,6 +25,7 @@ def get_next_task_mock():
if self.fail:
raise Exception('This task always fails.')

self.reexec = False
self.run_count = 0
self.max_runs = conf.get('max_runs', 1)
self.fail = conf.get('fail', False)
Expand Down Expand Up @@ -54,10 +55,6 @@ def wrap(conf, logger):
return self.task_manager
return wrap

def test_daemon_shutdown(self):
with self.assertRaises(ShutdownException):
main.daemon_shutdown()

def test_main_loop_task_manager_exception(self):
with patch('kobo.worker.main.TaskManager', Mock(side_effect=ValueError)) as mock_tm:
with patch.object(main.signal, 'signal') as signal_mock:
Expand All @@ -81,7 +78,9 @@ def test_main_loop_task_exception(self):
}, foreground=False)

signal_mock.assert_has_calls([
call(signal.SIGTERM, main.daemon_shutdown),
call(signal.SIGTERM, ANY),
call(signal.SIGHUP, ANY),
call(signal.SIGINT, signal.default_int_handler),
call(signal.SIGINT, signal.SIG_IGN),
call(signal.SIGTERM, signal.SIG_IGN),
], any_order=False)
Expand All @@ -107,7 +106,9 @@ def test_main_loop(self):
}, foreground=False)

signal_mock.assert_has_calls([
call(signal.SIGTERM, main.daemon_shutdown),
call(signal.SIGTERM, ANY),
call(signal.SIGHUP, ANY),
call(signal.SIGINT, signal.default_int_handler),
call(signal.SIGINT, signal.SIG_IGN),
call(signal.SIGTERM, signal.SIG_IGN),
], any_order=False)
Expand All @@ -132,7 +133,9 @@ def test_main_loop_in_foreground(self):
}, foreground=True)

signal_mock.assert_has_calls([
call(signal.SIGTERM, main.daemon_shutdown),
call(signal.SIGTERM, ANY),
call(signal.SIGHUP, ANY),
call(signal.SIGINT, signal.default_int_handler),
call(signal.SIGINT, signal.SIG_IGN),
call(signal.SIGTERM, signal.SIG_IGN),
], any_order=False)
Expand Down Expand Up @@ -167,7 +170,9 @@ def test_main_loop_with_file_logger(self):
log_mock.add_stderr_logger.assert_not_called()

signal_mock.assert_has_calls([
call(signal.SIGTERM, main.daemon_shutdown),
call(signal.SIGTERM, ANY),
call(signal.SIGHUP, ANY),
call(signal.SIGINT, signal.default_int_handler),
call(signal.SIGINT, signal.SIG_IGN),
call(signal.SIGTERM, signal.SIG_IGN),
], any_order=False)
Expand Down Expand Up @@ -202,7 +207,9 @@ def test_main_loop_foreground_with_file_logger(self):
log_mock.add_stderr_logger.assert_called_once_with(self.task_manager._logger)

signal_mock.assert_has_calls([
call(signal.SIGTERM, main.daemon_shutdown),
call(signal.SIGTERM, ANY),
call(signal.SIGHUP, ANY),
call(signal.SIGINT, signal.default_int_handler),
call(signal.SIGINT, signal.SIG_IGN),
call(signal.SIGTERM, signal.SIG_IGN),
], any_order=False)
Expand Down

0 comments on commit 7245924

Please sign in to comment.