Skip to content

Commit

Permalink
Add a simple LISTEN/NOTIFY communication protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
rjuju committed Apr 2, 2019
1 parent b6a17c2 commit 77776c5
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 8 deletions.
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,48 @@ Usage
To start the program, simply run the powa-collector.py program. A `SIGTERM` or a
`Keyboard Interrupt` on the program will cleanly stop all the thread and exit
the program. A `SIGHUP` will reload the configuration.

Protocol
========

A minimal communication protocol is implented, using the LISTEN/NOTIFY facility
provided by postgres, which is used by the powa-web project. You can send
queries to collector by sending messages on the "powa_collector" channel. The
collector will send answers on the channel you specified, so make sure to
listen on it before sending any query to not miss answers.

The requests are of the following form:

COMMAND RESPONSE_CHANNEL OPTIONAL_ARGUMENTS

- COMMAND: mandatory argument describing the query. The following commands
are supported:

- RELOAD: reload the configuration and report that the main thread
successfully received the command. The reload will be attempted even
if no response channel was provided.

- WORKERS_STATUS: return a JSON (srvid is the key, status is the content)
describing the status of each remote server thread. Command is ignored
if no response channel was provided. This command accept an optional
argument to get the status of a single remote server, identified by its
srvid. If no worker exists for this server, an empty JSON will be
returned.

- RESPONSE_CHANNEL: mandatory argument to describe the NOTIFY channel the
client listens a response on. '-' can be used if no answer should be
sent.

- OPTIONAL_ARGUMENTS: space separated list of arguments, specific to the
underlying command.

The answers are of the form:

COMMAND STATUS DATA

- COMMAND: same as the command in the query

- STATUS: OK or KO.

- DATA: reason for the failure if status is KO, otherwise the data for the
answer.
129 changes: 122 additions & 7 deletions powa_collector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,40 @@
"""
powa-collector main application.
PowaCollector: powa-collector main application.
It takes a simple configuration file in json format, where repository.dsn
should point to an URI to connect to the repository server. The list of remote
servers and their configuration will be retrieved from this repository server.
It maintains a persistent dedicated connection to the repository server, for
monitoring and communication purpose. It also starts one thread per remote
server. These threads are kept in the "workers" dict attribute, with the key
being the textual identifier (host:port). See powa_worker.py for more details
about those threads.
The main thread will intercept the following signals:
- SIGHUP: reload configuration and log and changes done
- SIGTERM: cleanly terminate all threads and exit
A minimal communication protocol is implented, using the LISTEN/NOTIFY facility
provided by postgres. The dedicated main thread repository connection listens
on the "powa_collector" channel. A client, such as powa-web, can send requests
on this channel and the main thread will act and respond accordingly.
The requests are of the following form:
COMMAND RESPONSE_CHANNEL OPTIONAL_ARGUMENTS
See the README.md file for the full protocol documentation.
"""

from powa_collector.options import (parse_options, get_full_config,
add_servers_config)
from powa_collector.powa_worker import PowaThread
import psycopg2
import time
import select
import logging
import json
import signal

__VERSION__ = '0.0.1'
Expand All @@ -32,6 +59,7 @@ def connect(self, options):
try:
self.logger.debug("Connecting on repository...")
self.__repo_conn = psycopg2.connect(options["repository"]['dsn'])
self.__repo_conn.autocommit = True
self.logger.debug("Connected.")
cur = self.__repo_conn.cursor()
cur.execute("SET application_name = %s",
Expand All @@ -46,6 +74,69 @@ def connect(self, options):

return True

def process_notification(self):
if (not self.__repo_conn):
return

self.__repo_conn.poll()
cur = self.__repo_conn.cursor()

while (self.__repo_conn.notifies):
notif = self.__repo_conn.notifies.pop(0).payload.split(' ')
status = ''
cmd = notif.pop(0)
channel = "-"
status = "OK"
data = None

# the channel is mandatory, but if the message can be processed
# without answering, we'll try to
if (len(notif) > 0):
channel = notif.pop(0)

self.logger.debug("Received async command: %s %s %r" %
(cmd, channel, notif))

if (cmd == "RELOAD"):
self.reload_conf()
data = 'OK'
elif (cmd == "WORKERS_STATUS"):
# ignore the message if no channel was received
if (channel != '-'):
# did the caller requested a single server only? We ignore
# anything but the first parameter passed
if (len(notif) > 0 and notif[0].isdigit()):
w_id = int(notif[0])
data = json.dumps(self.list_workers(w_id, False))
else:
data = json.dumps(self.list_workers(None, False))
# everything else is unhandled
else:
status = 'UNKNOWN'
data = ''

# if there was a response channel, reply back
if (channel != '-'):
payload = ("%(cmd)s %(status)s %(data)s" %
{'cmd': cmd, 'status': status, 'data': data})

# with default configuration, postgres only accept up to 8k
# bytes payload. If the payload is longer, just warn the
# caller that it didn't fit.
# XXX we could implement multi-part answer, but if we ever
# reach that point, we should consider moving to a table
if (len(payload.encode('utf-8')) >= 8000):
payload = ("%(cmd)s %(status)s %(data)" %
{'cmd': cmd,
'status': "KO",
'data': "ANSWER TOO LONG"})

cur.execute("""NOTIFY "%(channel)s", '%(payload)s'""" %
{'channel': channel,
'payload': payload})

cur.close()

def main(self):
raw_options = parse_options()
self.logger.info("Starting powa-collector...")
Expand Down Expand Up @@ -74,7 +165,9 @@ def main(self):
if (not self.__repo_conn):
self.connect(raw_options)

time.sleep(10)
select.select([self.__repo_conn], [], [], 10)

self.process_notification()
except KeyboardInterrupt:
self.logger.debug("KeyboardInterrupt caught")
self.logger.info("Stopping all workers and exiting...")
Expand All @@ -100,13 +193,35 @@ def sighandler(self, signum, frame):
else:
self.logger.error("Unhandled signal %d" % signum)

def list_workers(self):
self.logger.info('List of workers:')
def list_workers(self, wanted_srvid=None, tostdout=True):
res = {}

if (tostdout):
self.logger.info('List of workers:')

for k, worker in self.workers.items():
# self.logger.info(" %s%s" % (k, "" if (worker.isAlive()) else
# " (stopped)"))
self.logger.info("%r%s" % (worker, "" if (worker.isAlive()) else
" (stopped)"))
worker_srvid = self.config["servers"][k]["srvid"]

# ignore this entry if caller want information for only one server
if (wanted_srvid is not None and wanted_srvid != worker_srvid):
continue

status = "Unknown"
if (worker.is_stopping()):
status = "stopping"
elif (worker.isAlive()):
status = "running"
else:
status = "stopped"

if (tostdout):
self.logger.info("%r (%s)" % (worker, status))
else:
res[worker_srvid] = status

return res

def reload_conf(self):
self.list_workers()
Expand Down
15 changes: 14 additions & 1 deletion powa_collector/powa_worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
"""
PowaThread: powa-collector dedicated remote server thread.
One of such thread is started per remote server by the main thred. Each
threads will use 2 connections:
- a persistent dedicated connection to the remote server, where it'll get
the source data
- a connection to the repository server, to write the source data and
perform the snapshot. This connection is created and dropped at each
checkpoint
"""
import threading
import time
import calendar
Expand All @@ -6,7 +18,7 @@
from os import SEEK_SET
import sys
from powa_collector.snapshot import (get_snapshot_functions, get_src_query,
get_tmp_name)
get_tmp_name)
if (sys.version_info < (3, 0)):
from StringIO import StringIO
else:
Expand Down Expand Up @@ -159,6 +171,7 @@ def __disconnect_repo(self):
self.__repo_conn = None

def __disconnect_all_and_exit(self):
# this is the exit point
self.__disconnect_all()
self.logger.info("stopped")
self.__stopping.clear()
Expand Down

0 comments on commit 77776c5

Please sign in to comment.