From f34557aa455efd87da9bc1822b84fa0b0fb95e48 Mon Sep 17 00:00:00 2001 From: Jose Galarza Date: Wed, 30 Aug 2017 00:14:47 +0100 Subject: [PATCH] Graceful shutdown of the worker Thanks @grvhi for the implementation --- sqjobs/worker.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sqjobs/worker.py b/sqjobs/worker.py index b602ae8..876e608 100644 --- a/sqjobs/worker.py +++ b/sqjobs/worker.py @@ -1,3 +1,4 @@ +import signal import sys import traceback @@ -16,6 +17,10 @@ def __init__(self, broker, queue_name, timeout=None): self.timeout = timeout or self.DEFAULT_TIMEOUT self.registered_jobs = {} self.exception_handlers = [] + self._shutting_down = False + + signal.signal(signal.SIGINT, self._exit_gracefully) + signal.signal(signal.SIGTERM, self._exit_gracefully) def __repr__(self): return 'Worker({connector})'.format( @@ -43,6 +48,9 @@ def run(self): logger.info('Running worker, %d jobs registered...', len(self.registered_jobs)) for payload in self.broker.jobs(self.queue_name, self.timeout): + if self._shutting_down: + break + try: job_class = self.registered_jobs.get(payload['name']) @@ -95,3 +103,7 @@ def _handle_exception(self, job, args, kwargs, *exc_info): for handler in reversed(self.exception_handlers): logger.debug('Executing exception handler %s', handler) handler(job, args, kwargs, *exc_info) + + def _exit_gracefully(self, signum, frame): + logger.info('Shutting down the worker...') + self._shutting_down = True