Skip to content

Commit

Permalink
Prototype of external deamon to process remote snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
rjuju committed Mar 29, 2019
1 parent 4360bb2 commit 2d94f48
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 0 deletions.
15 changes: 15 additions & 0 deletions powa-remote.conf-dist
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"repository": {
"dsn": "postgresql://powa_repo@localhost:5432/powa",
},
"servers": {
"srv1": {
"dsn": "postgresql://user@localhost:5433/powa",
"frequency": 30;
},
"srv2": {
"dsn": "postgresql://user@localhost:5434/powa",
"frequency": 60;
}
}
}
8 changes: 8 additions & 0 deletions powa-remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python

from powa_remote import PowaRemote
import logging

#app = PowaRemote()
app = PowaRemote(loglevel=logging.DEBUG)
app.main()
111 changes: 111 additions & 0 deletions powa_remote/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
powa-remote main application.
"""

from powa_remote.options import parse_options
from powa_remote.powa_worker import PowaThread
import time
import logging
import signal

__VERSION__ = '0.0.1'
__VERSION_NUM__ =[int(part) for part in __VERSION__.split('.')]

from powa_remote.options import parse_options

class PowaRemote():
def __init__(self, loglevel = None):
self.workers = {}
self.logger = logging.getLogger("powa-remote")
self.stopping = False;

if (loglevel is not None):
loglevel = loglevel
else:
loglevel = logging.INFO

extra = {'threadname': '-'}
logging.basicConfig(format='%(asctime)s %(threadname)s: %(message)s ', level=loglevel)
self.logger = logging.LoggerAdapter(self.logger, extra)
signal.signal(signal.SIGHUP, self.sighandler)
signal.signal(signal.SIGTERM, self.sighandler)

def main(self):
self.config = parse_options()
self.logger.info("Starting powa-remote...")

for s in self.config["servers"]:
self.register_worker(s, self.config["repository"], self.config["servers"][s])

try:
while (not self.stopping):
time.sleep(1)
except KeyboardInterrupt:
self.logger.debug("KeyboardInterrupt caught")
self.logger.info("Stopping all workers and exiting...")
self.stop_all_workers()

def register_worker(self, name, repository, config):
self.workers[name] = PowaThread(name, repository, config)
#self.workers[s].daemon = True
self.workers[name].start()

def stop_all_workers(self):
for k in self.workers:
self.workers[k].ask_to_stop()

def sighandler(self, signum, frame):
if (signum == signal.SIGHUP):
self.logger.debug("SIGHUP caught")
self.reload_conf()
elif (signum == signal.SIGTERM):
self.logger.debug("SIGTERM caught")
self.stop_all_workers()
self.stopping = True
else:
self.logger.error("Unhandled signal %d" % signum);

def reload_conf(self):
self.logger.info('Reloading...')
config_new = parse_options()

# check for removed servers
for k in self.workers:
if (self.workers[k].isAlive()):
continue

if (self.workers[k].is_stopping()):
self.logger.warn("Oops")

if (k not in config_new["servers"]):
self.logger.info("%s has been removed, stopping it..." % k)
self.workers[k].ask_to_stop()

# check for added servers
for k in config_new["servers"]:
if (k not in self.workers or not self.workers[k].isAlive()):
self.logger.info("%s has been added, registering it..." % k)
self.register_worker(k, config_new["repository"], config_new["servers"][k])

# check for updated configuration
for k in config_new["servers"]:
cur = config_new["servers"][k]
if (not conf_are_equal(cur, self.workers[k].get_config())):
self.workers[k].ask_reload(cur)

self.config = config_new

def conf_are_equal(conf1, conf2):
for k in conf1.keys():
if (k not in conf2):
return False
if (conf1[k] != conf2[k]):
return False

for k in conf2.keys():
if (k not in conf1):
return False
if (conf1[k] != conf2[k]):
return False

return True
7 changes: 7 additions & 0 deletions powa_remote/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import json

def parse_options():
return parse_file('./powa-remote.conf')

def parse_file(filepath):
return json.load(open(filepath))
126 changes: 126 additions & 0 deletions powa_remote/powa_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import threading
import time
import calendar
import psycopg2
import logging

class PowaThread (threading.Thread):
def __init__(self, name, repository, config):
threading.Thread.__init__(self)
self.__stopping = threading.Event()
self.__got_sighup = threading.Event()
self.__connected = threading.Event()
self.name = name
self.__repository = repository
self.__config = config
self.__pending_config = None
self.__remote_conn = None
self.__repo_conn = None;
self.logger = logging.getLogger("powa-remote")
self.last_time = None

extra = {'threadname': self.name}
self.logger = logging.LoggerAdapter(self.logger, extra)

def __check_powa(self):
if (self.__remote_conn is not None):
cur = self.__remote_conn.cursor()
cur.execute("SELECT COUNT(*) FROM pg_extension WHERE extname = 'powa'")
res = cur.fetchone()
cur.close()

if (res[0] != 1):
self.logger.error("PoWA extension not found")
self.__disconnect()
self.__stopping.set()

def __reload(self):
self.logger.info("Reloading configuration")
self.__config = self.__pending_config
self.__pending_config = None
self.__disconnect()
self.__connect()
self.__got_sighup.clear()

def __connect(self):
if ('dsn' not in self.__repository or 'dsn' not in self.__config):
self.logger.error("Missing connection info")
self.__stopping.set()
return

try:
self.logger.debug("Connecting on repository...")
self.__repo_conn = psycopg2.connect(self.__repository['dsn'])
self.logger.debug("Connected.")

self.logger.debug("Connecting on remote database...")
self.__remote_conn = psycopg2.connect(self.__config['dsn'])
self.logger.debug("Connected.")
self.__connected.set()
except:
self.logger.error("Error connecting")
self.__disconnect()
self.__stopping.set()

def __disconnect(self):
if (self.__remote_conn is not None):
self.logger.info("Disconnecting from remote server")
self.__remote_conn.close()
self.__remote_conn = None
if (self.__repo_conn is not None):
self.logger.info("Disconnecting from repository")
self.__repo_conn.close()
self.__repo_conn = None
self.__connected.clear()

def __disconnect_and_exit(self):
self.__disconnect()
self.logger.info("stopped")
self.__stopping.clear()

def __worker_main(self):
self.__connect()
self.last_time = calendar.timegm(time.gmtime())
self.__check_powa()
while (not self.__stopping.isSet()):
cur_time = calendar.timegm(time.gmtime())
if (self.__got_sighup.isSet()):
self.__reload()

if ((cur_time - self.last_time) >= self.__config["frequency"]):
self.__take_snapshot()
self.last_time = calendar.timegm(time.gmtime())
time.sleep(0.1)

self.__disconnect_and_exit()

def __take_snapshot(self):
cur = self.__remote_conn.cursor()
cur.execute("""SELECT function_name FROM powa_functions
WHERE operation = 'snapshot' AND enabled""")
rows = cur.fetchall()
cur.close()

for f in rows:
self.logger.debug("Should call %s()" % f[0])

def is_stopping(self):
return self.__stopping.isSet()

def get_config(self):
return self.__config

def ask_to_stop(self):
self.__stopping.set()
self.logger.info("Asked to stop...")

def run(self):
if (not self.__stopping.isSet()):
self.logger.info("Starting worker")
self.__worker_main()

def ask_reload(self, new_config):
self.logger.debug("Reload asked")
self.__pending_config = new_config
self.__got_sighup.set()

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
psycopg2

0 comments on commit 2d94f48

Please sign in to comment.