From 2d94f48ed5343fb14a9925f0308910aed3944a5a Mon Sep 17 00:00:00 2001 From: Julien Rouhaud Date: Thu, 15 Mar 2018 19:42:27 +0100 Subject: [PATCH] Prototype of external deamon to process remote snapshot --- powa-remote.conf-dist | 15 +++++ powa-remote.py | 8 +++ powa_remote/__init__.py | 111 ++++++++++++++++++++++++++++++++ powa_remote/options.py | 7 +++ powa_remote/powa_worker.py | 126 +++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + 6 files changed, 268 insertions(+) create mode 100644 powa-remote.conf-dist create mode 100755 powa-remote.py create mode 100644 powa_remote/__init__.py create mode 100644 powa_remote/options.py create mode 100644 powa_remote/powa_worker.py create mode 100644 requirements.txt diff --git a/powa-remote.conf-dist b/powa-remote.conf-dist new file mode 100644 index 0000000..2bb7586 --- /dev/null +++ b/powa-remote.conf-dist @@ -0,0 +1,15 @@ +{ + "repository": { + "dsn": "postgresql://powa_repo@localhost:5432/powa", + }, + "servers": { + "srv1": { + "dsn": "postgresql://user@localhost:5433/powa", + "frequency": 30; + }, + "srv2": { + "dsn": "postgresql://user@localhost:5434/powa", + "frequency": 60; + } + } +} diff --git a/powa-remote.py b/powa-remote.py new file mode 100755 index 0000000..9146ea4 --- /dev/null +++ b/powa-remote.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python + +from powa_remote import PowaRemote +import logging + +#app = PowaRemote() +app = PowaRemote(loglevel=logging.DEBUG) +app.main() diff --git a/powa_remote/__init__.py b/powa_remote/__init__.py new file mode 100644 index 0000000..50516b5 --- /dev/null +++ b/powa_remote/__init__.py @@ -0,0 +1,111 @@ +""" +powa-remote main application. +""" + +from powa_remote.options import parse_options +from powa_remote.powa_worker import PowaThread +import time +import logging +import signal + +__VERSION__ = '0.0.1' +__VERSION_NUM__ =[int(part) for part in __VERSION__.split('.')] + +from powa_remote.options import parse_options + +class PowaRemote(): + def __init__(self, loglevel = None): + self.workers = {} + self.logger = logging.getLogger("powa-remote") + self.stopping = False; + + if (loglevel is not None): + loglevel = loglevel + else: + loglevel = logging.INFO + + extra = {'threadname': '-'} + logging.basicConfig(format='%(asctime)s %(threadname)s: %(message)s ', level=loglevel) + self.logger = logging.LoggerAdapter(self.logger, extra) + signal.signal(signal.SIGHUP, self.sighandler) + signal.signal(signal.SIGTERM, self.sighandler) + + def main(self): + self.config = parse_options() + self.logger.info("Starting powa-remote...") + + for s in self.config["servers"]: + self.register_worker(s, self.config["repository"], self.config["servers"][s]) + + try: + while (not self.stopping): + time.sleep(1) + except KeyboardInterrupt: + self.logger.debug("KeyboardInterrupt caught") + self.logger.info("Stopping all workers and exiting...") + self.stop_all_workers() + + def register_worker(self, name, repository, config): + self.workers[name] = PowaThread(name, repository, config) + #self.workers[s].daemon = True + self.workers[name].start() + + def stop_all_workers(self): + for k in self.workers: + self.workers[k].ask_to_stop() + + def sighandler(self, signum, frame): + if (signum == signal.SIGHUP): + self.logger.debug("SIGHUP caught") + self.reload_conf() + elif (signum == signal.SIGTERM): + self.logger.debug("SIGTERM caught") + self.stop_all_workers() + self.stopping = True + else: + self.logger.error("Unhandled signal %d" % signum); + + def reload_conf(self): + self.logger.info('Reloading...') + config_new = parse_options() + + # check for removed servers + for k in self.workers: + if (self.workers[k].isAlive()): + continue + + if (self.workers[k].is_stopping()): + self.logger.warn("Oops") + + if (k not in config_new["servers"]): + self.logger.info("%s has been removed, stopping it..." % k) + self.workers[k].ask_to_stop() + + # check for added servers + for k in config_new["servers"]: + if (k not in self.workers or not self.workers[k].isAlive()): + self.logger.info("%s has been added, registering it..." % k) + self.register_worker(k, config_new["repository"], config_new["servers"][k]) + + # check for updated configuration + for k in config_new["servers"]: + cur = config_new["servers"][k] + if (not conf_are_equal(cur, self.workers[k].get_config())): + self.workers[k].ask_reload(cur) + + self.config = config_new + +def conf_are_equal(conf1, conf2): + for k in conf1.keys(): + if (k not in conf2): + return False + if (conf1[k] != conf2[k]): + return False + + for k in conf2.keys(): + if (k not in conf1): + return False + if (conf1[k] != conf2[k]): + return False + + return True diff --git a/powa_remote/options.py b/powa_remote/options.py new file mode 100644 index 0000000..3de6b65 --- /dev/null +++ b/powa_remote/options.py @@ -0,0 +1,7 @@ +import json + +def parse_options(): + return parse_file('./powa-remote.conf') + +def parse_file(filepath): + return json.load(open(filepath)) diff --git a/powa_remote/powa_worker.py b/powa_remote/powa_worker.py new file mode 100644 index 0000000..ae32053 --- /dev/null +++ b/powa_remote/powa_worker.py @@ -0,0 +1,126 @@ +import threading +import time +import calendar +import psycopg2 +import logging + +class PowaThread (threading.Thread): + def __init__(self, name, repository, config): + threading.Thread.__init__(self) + self.__stopping = threading.Event() + self.__got_sighup = threading.Event() + self.__connected = threading.Event() + self.name = name + self.__repository = repository + self.__config = config + self.__pending_config = None + self.__remote_conn = None + self.__repo_conn = None; + self.logger = logging.getLogger("powa-remote") + self.last_time = None + + extra = {'threadname': self.name} + self.logger = logging.LoggerAdapter(self.logger, extra) + + def __check_powa(self): + if (self.__remote_conn is not None): + cur = self.__remote_conn.cursor() + cur.execute("SELECT COUNT(*) FROM pg_extension WHERE extname = 'powa'") + res = cur.fetchone() + cur.close() + + if (res[0] != 1): + self.logger.error("PoWA extension not found") + self.__disconnect() + self.__stopping.set() + + def __reload(self): + self.logger.info("Reloading configuration") + self.__config = self.__pending_config + self.__pending_config = None + self.__disconnect() + self.__connect() + self.__got_sighup.clear() + + def __connect(self): + if ('dsn' not in self.__repository or 'dsn' not in self.__config): + self.logger.error("Missing connection info") + self.__stopping.set() + return + + try: + self.logger.debug("Connecting on repository...") + self.__repo_conn = psycopg2.connect(self.__repository['dsn']) + self.logger.debug("Connected.") + + self.logger.debug("Connecting on remote database...") + self.__remote_conn = psycopg2.connect(self.__config['dsn']) + self.logger.debug("Connected.") + self.__connected.set() + except: + self.logger.error("Error connecting") + self.__disconnect() + self.__stopping.set() + + def __disconnect(self): + if (self.__remote_conn is not None): + self.logger.info("Disconnecting from remote server") + self.__remote_conn.close() + self.__remote_conn = None + if (self.__repo_conn is not None): + self.logger.info("Disconnecting from repository") + self.__repo_conn.close() + self.__repo_conn = None + self.__connected.clear() + + def __disconnect_and_exit(self): + self.__disconnect() + self.logger.info("stopped") + self.__stopping.clear() + + def __worker_main(self): + self.__connect() + self.last_time = calendar.timegm(time.gmtime()) + self.__check_powa() + while (not self.__stopping.isSet()): + cur_time = calendar.timegm(time.gmtime()) + if (self.__got_sighup.isSet()): + self.__reload() + + if ((cur_time - self.last_time) >= self.__config["frequency"]): + self.__take_snapshot() + self.last_time = calendar.timegm(time.gmtime()) + time.sleep(0.1) + + self.__disconnect_and_exit() + + def __take_snapshot(self): + cur = self.__remote_conn.cursor() + cur.execute("""SELECT function_name FROM powa_functions + WHERE operation = 'snapshot' AND enabled""") + rows = cur.fetchall() + cur.close() + + for f in rows: + self.logger.debug("Should call %s()" % f[0]) + + def is_stopping(self): + return self.__stopping.isSet() + + def get_config(self): + return self.__config + + def ask_to_stop(self): + self.__stopping.set() + self.logger.info("Asked to stop...") + + def run(self): + if (not self.__stopping.isSet()): + self.logger.info("Starting worker") + self.__worker_main() + + def ask_reload(self, new_config): + self.logger.debug("Reload asked") + self.__pending_config = new_config + self.__got_sighup.set() + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..658130b --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +psycopg2