Skip to content

Commit

Permalink
Implement remote snapshot.
Browse files Browse the repository at this point in the history
This relies on infrastructure added by PoWA 4.

The collector now get the list of datasources from the repository for the given
instance, export each datasource's data from the remote instance to a
repository table, and call powa_take_snapshot for the instance.

Some basic logging and reporting is also added.
  • Loading branch information
rjuju committed Apr 1, 2019
1 parent a1d16c7 commit b6a17c2
Show file tree
Hide file tree
Showing 12 changed files with 519 additions and 177 deletions.
File renamed without changes.
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Overview
========

This repository contains the `powa-collector` tool, a simple multi-threaded
python program that performs the snapshots for all the remote servers
configured in a powa repository database (in the **powa_servers** table).

Requirements
============

This program requires python 2.7 or python 3.

The required dependencies are listed in the **requirements.txt** file.

Configuration
=============

Copy the provided `powa-collector.conf-dist` file to a new `powa-collector.conf`
file, and adapt the **dsn** specification to be able to connect to the wanted
main PoWA repository.

Usage
=====

To start the program, simply run the powa-collector.py program. A `SIGTERM` or a
`Keyboard Interrupt` on the program will cleanly stop all the thread and exit
the program. A `SIGHUP` will reload the configuration.
5 changes: 5 additions & 0 deletions powa-collector.conf-dist
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"repository": {
"dsn": "postgresql://powa_user@localhost:5432/powa"
}
}
8 changes: 8 additions & 0 deletions powa-collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python

from powa_collector import PowaCollector
import logging

# app = PowaCollector()
app = PowaCollector(loglevel=logging.DEBUG)
app.main()
15 changes: 0 additions & 15 deletions powa-remote.conf-dist

This file was deleted.

8 changes: 0 additions & 8 deletions powa-remote.py

This file was deleted.

91 changes: 70 additions & 21 deletions powa_remote/__init__.py → powa_collector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,88 @@
"""
powa-remote main application.
powa-collector main application.
"""

from powa_remote.options import parse_options
from powa_remote.powa_worker import PowaThread
from powa_collector.options import (parse_options, get_full_config,
add_servers_config)
from powa_collector.powa_worker import PowaThread
import psycopg2
import time
import logging
import signal

__VERSION__ = '0.0.1'
__VERSION_NUM__ =[int(part) for part in __VERSION__.split('.')]
__VERSION_NUM__ = [int(part) for part in __VERSION__.split('.')]

from powa_remote.options import parse_options

class PowaRemote():
def __init__(self, loglevel = logging.INFO):
class PowaCollector():
def __init__(self, loglevel=logging.INFO):
self.workers = {}
self.logger = logging.getLogger("powa-remote")
self.stopping = False;
self.logger = logging.getLogger("powa-collector")
self.stopping = False

extra = {'threadname': '-'}
logging.basicConfig(format='%(asctime)s %(threadname)s: %(message)s ', level=loglevel)
logging.basicConfig(
format='%(asctime)s %(threadname)s %(levelname)-6s: %(message)s ',
level=loglevel)
self.logger = logging.LoggerAdapter(self.logger, extra)
signal.signal(signal.SIGHUP, self.sighandler)
signal.signal(signal.SIGTERM, self.sighandler)

def connect(self, options):
try:
self.logger.debug("Connecting on repository...")
self.__repo_conn = psycopg2.connect(options["repository"]['dsn'])
self.logger.debug("Connected.")
cur = self.__repo_conn.cursor()
cur.execute("SET application_name = %s",
('PoWA collector - main thread'
+ ' (' + __VERSION__ + ')', ))
cur.execute("LISTEN powa_collector")
cur.close()
except psycopg2.Error as e:
self.__repo_conn = None
self.logger.error("Error connecting:\n%s", e)
return False

return True

def main(self):
self.config = parse_options()
self.logger.info("Starting powa-remote...")
raw_options = parse_options()
self.logger.info("Starting powa-collector...")

if (not self.connect(raw_options)):
exit(1)

self.config = add_servers_config(self.__repo_conn, raw_options)

for k, conf in self.config["servers"].items():
self.register_worker(k, self.config["repository"], conf)

self.list_workers()

try:
while (not self.stopping):
time.sleep(1)
if (self.__repo_conn is not None):
try:
cur = self.__repo_conn.cursor()
cur.execute("SELECT 1")
cur.close()
except Exception:
self.__repo_conn = None
self.logger.warning("Connection was dropped!")

if (not self.__repo_conn):
self.connect(raw_options)

time.sleep(10)
except KeyboardInterrupt:
self.logger.debug("KeyboardInterrupt caught")
self.logger.info("Stopping all workers and exiting...")
self.stop_all_workers()

def register_worker(self, name, repository, config):
self.workers[name] = PowaThread(name, repository, config)
#self.workers[s].daemon = True
# self.workers[s].daemon = True
self.workers[name].start()

def stop_all_workers(self):
Expand All @@ -58,23 +98,29 @@ def sighandler(self, signum, frame):
self.stop_all_workers()
self.stopping = True
else:
self.logger.error("Unhandled signal %d" % signum);
self.logger.error("Unhandled signal %d" % signum)

def reload_conf(self):
def list_workers(self):
self.logger.info('List of workers:')
for k, worker in self.workers.items():
self.logger.info(" %s%s" % (k, "" if (worker.isAlive()) else
" (stopped)"))
# self.logger.info(" %s%s" % (k, "" if (worker.isAlive()) else
# " (stopped)"))
self.logger.info("%r%s" % (worker, "" if (worker.isAlive()) else
" (stopped)"))

def reload_conf(self):
self.list_workers()

self.logger.info('Reloading...')
config_new = parse_options()
config_new = get_full_config(self.__repo_conn)

# check for removed servers
for k, worker in self.workers.items():
if (worker.isAlive()):
continue

if (worker.is_stopping()):
self.logger.warn("Oops")
self.logger.warning("Oops")

if (k not in config_new["servers"]):
self.logger.info("%s has been removed, stopping it..." % k)
Expand All @@ -84,7 +130,8 @@ def reload_conf(self):
for k in config_new["servers"]:
if (k not in self.workers or not self.workers[k].isAlive()):
self.logger.info("%s has been added, registering it..." % k)
self.register_worker(k, config_new["repository"], config_new["servers"][k])
self.register_worker(k, config_new["repository"],
config_new["servers"][k])

# check for updated configuration
for k in config_new["servers"]:
Expand All @@ -93,6 +140,8 @@ def reload_conf(self):
self.workers[k].ask_reload(cur)

self.config = config_new
self.logger.info('Reload done')


def conf_are_equal(conf1, conf2):
for k in conf1.keys():
Expand Down
46 changes: 46 additions & 0 deletions powa_collector/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json


def get_full_config(conn):
return add_servers_config(conn, parse_options())


def add_servers_config(conn, config):
if ("servers" not in config):
config["servers"] = {}

cur = conn.cursor()
cur.execute("""
SELECT id, hostname, port, username, password, dbname,
frequency
FROM powa_servers s
WHERE s.id > 0
ORDER BY id
""")

for row in cur:
parms = {}
parms["host"] = row[1]
parms["port"] = row[2]
parms["user"] = row[3]
if (row[4] is not None):
parms["password"] = row[4]
parms["dbname"] = row[5]

key = row[1] + ':' + str(row[2])
config["servers"][key] = {}
config["servers"][key]["dsn"] = parms
config["servers"][key]["frequency"] = row[6]
config["servers"][key]["srvid"] = row[0]

conn.commit()

return config


def parse_options():
return parse_file('./powa-collector.conf')


def parse_file(filepath):
return json.load(open(filepath))
Loading

0 comments on commit b6a17c2

Please sign in to comment.