Skip to content

Commit

Permalink
Keep worker running on connection error, and retry at each snapshot
Browse files Browse the repository at this point in the history
Also accurately change the workers' reported status for the various connection
error possibility.

Thanks to Adrien Nayrat for the feature request.
  • Loading branch information
rjuju committed Apr 3, 2019
1 parent ace3242 commit 088e793
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 9 deletions.
5 changes: 4 additions & 1 deletion powa_collector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def list_workers(self, wanted_srvid=None, tostdout=True):
if (worker.is_stopping()):
status = "stopping"
elif (worker.isAlive()):
status = "running"
status = worker.get_status()
else:
status = "stopped"

Expand Down Expand Up @@ -258,6 +258,9 @@ def reload_conf(self):
cur = config_new["servers"][k]
if (not conf_are_equal(cur, self.workers[k].get_config())):
self.workers[k].ask_reload(cur)
# also try to reconnect if the worker experienced connection issue
elif(self.workers[k].get_status() != "running"):
self.workers[k].ask_reload(cur)

self.config = config_new
self.logger.info('Reload done')
Expand Down
43 changes: 35 additions & 8 deletions powa_collector/powa_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, name, repository, config):
self.__pending_config = None
self.__remote_conn = None
self.__repo_conn = None
self.__last_repo_conn_errored = False
self.logger = logging.getLogger("powa-collector")
self.last_time = None

Expand Down Expand Up @@ -131,6 +132,7 @@ def __connect(self):
('PoWA collector - repo_conn for worker ' + self.name,))
cur.close()
self.__repo_conn.commit()
self.__last_repo_conn_errored = False

if (self.__remote_conn is None):
self.logger.debug("Connecting on remote database...")
Expand All @@ -148,13 +150,12 @@ def __connect(self):
self.__connected.set()
except psycopg2.Error as e:
self.logger.error("Error connecting on %s:\n%s" %
(self.__config["dsn"], e))
(self.__config["dsn"], e))

if (self.__repo_conn is not None):
self.__report_error("%s" % (e))

self.__disconnect_all()
self.__stopping.set()
else:
self.__last_repo_conn_errored = True

def __disconnect_all(self):
if (self.__remote_conn is not None):
Expand Down Expand Up @@ -183,7 +184,7 @@ def __worker_main(self):

# if this worker has been restarted, restore the previous snapshot
# time to try to keep up on the same frequency
if (not self.is_stopping()):
if (not self.is_stopping() and self.__repo_conn is not None):
cur = None
try:
cur = self.__repo_conn.cursor()
Expand All @@ -209,8 +210,9 @@ def __worker_main(self):
# spikes if the collector itself was stopped for a long time, or if a
# lot of new servers were added
if (not self.is_stopping()
and (calendar.timegm(time.gmtime()) -
self.last_time) > self.__config["frequency"]):
and self.last_time is not None
and ((calendar.timegm(time.gmtime()) -
self.last_time) > self.__config["frequency"])):
random.seed()
r = random.randint(0, self.__config["frequency"] - 1)
self.logger.debug("Spreading snapshot: setting last snapshot to"
Expand All @@ -225,7 +227,17 @@ def __worker_main(self):

if ((self.last_time is None) or
(cur_time - self.last_time) >= self.__config["frequency"]):
self.__take_snapshot()
try:
self.__take_snapshot()
except psycopg2.Error as e:
self.logger.error("Error during snapshot: %s" % e)
if (self.__repo_conn is None
or self.__repo_conn.closed > 0):
self.__repo_conn = None
if (self.__remote_conn is None
or self.__remote_conn.closed > 0):
self.__remote_conn = None

self.last_time = calendar.timegm(time.gmtime())
time_to_sleep = self.__config["frequency"] - (cur_time -
self.last_time)
Expand Down Expand Up @@ -260,6 +272,14 @@ def __take_snapshot(self):

self.__connect()

if (self.__remote_conn is None):
self.logger.error("No connection to remote server, snapshot skipped")
return

if (self.__repo_conn is None):
self.logger.error("No connection to repository server, snapshot skipped")
return

# get the list of snapshot functions, and their associated query_src
cur = self.__repo_conn.cursor()
cur.execute(get_snapshot_functions(), (srvid,))
Expand Down Expand Up @@ -377,3 +397,10 @@ def ask_reload(self, new_config):
self.__got_sighup.set()
self.__stop_sleep.set()

def get_status(self):
if (self.__repo_conn is None and self.__last_repo_conn_errored):
return "no connection to repository server"
if (self.__remote_conn is None):
return "no connection to remote server"
else:
return "running"

0 comments on commit 088e793

Please sign in to comment.