Skip to content

Commit

Permalink
Automatically reconnect on idle, fixes #34
Browse files Browse the repository at this point in the history
  • Loading branch information
tris committed Oct 12, 2023
1 parent 3026cc6 commit 6d31dfd
Showing 1 changed file with 30 additions and 2 deletions.
32 changes: 30 additions & 2 deletions ecoflow_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
import paho.mqtt.client as mqtt
from queue import Queue
from prometheus_client import start_http_server, REGISTRY, Gauge, Counter
from threading import Timer


class RepeatTimer(Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)


class EcoflowMetricException(Exception):
Expand Down Expand Up @@ -89,14 +96,28 @@ def get_json_response(self, request):

class EcoflowMQTT():

def __init__(self, message_queue, device_sn, username, password, addr, port, client_id):
def __init__(self, message_queue, device_sn, username, password, addr, port, client_id, timeout_seconds):
self.message_queue = message_queue
self.addr = addr
self.port = port
self.username = username
self.password = password
self.client_id = client_id
self.topic = f"/app/device/property/{device_sn}"
self.timeout_seconds = timeout_seconds
self.last_message_time = None
self.client = None

self.connect()

self.idle_timer = RepeatTimer(10, self.idle_reconnect)
self.idle_timer.daemon = True
self.idle_timer.start()

def connect(self):
if self.client:
self.client.loop_stop()
self.client.disconnect()

self.client = mqtt.Client(self.client_id)
self.client.username_pw_set(self.username, self.password)
Expand All @@ -110,6 +131,11 @@ def __init__(self, message_queue, device_sn, username, password, addr, port, cli
self.client.connect(self.addr, self.port)
self.client.loop_start()

def idle_reconnect(self):
if self.last_message_time and time.time() - self.last_message_time > self.timeout_seconds:
log.error(f"No messages received for {self.timeout_seconds} seconds. Reconnecting to MQTT")
self.connect()

def on_connect(self, client, userdata, flags, rc):
match rc:
case 0:
Expand Down Expand Up @@ -139,6 +165,7 @@ def on_disconnect(self, client, userdata, rc):

def on_message(self, client, userdata, message):
self.message_queue.put(message.payload.decode("utf-8"))
self.last_message_time = time.time()


class EcoflowMetric:
Expand Down Expand Up @@ -280,6 +307,7 @@ def main():
ecoflow_password = os.getenv("ECOFLOW_PASSWORD")
exporter_port = int(os.getenv("EXPORTER_PORT", "9090"))
collecting_interval_seconds = int(os.getenv("COLLECTING_INTERVAL", "10"))
timeout_seconds = int(os.getenv("MQTT_TIMEOUT", "60"))

if (not device_sn or not ecoflow_username or not ecoflow_password):
log.error("Please, provide all required environment variables: DEVICE_SN, ECOFLOW_USERNAME, ECOFLOW_PASSWORD")
Expand All @@ -293,7 +321,7 @@ def main():

message_queue = Queue()

EcoflowMQTT(message_queue, device_sn, auth.mqtt_username, auth.mqtt_password, auth.mqtt_url, auth.mqtt_port, auth.mqtt_client_id)
EcoflowMQTT(message_queue, device_sn, auth.mqtt_username, auth.mqtt_password, auth.mqtt_url, auth.mqtt_port, auth.mqtt_client_id, timeout_seconds)

metrics = Worker(message_queue, device_name, collecting_interval_seconds)

Expand Down

0 comments on commit 6d31dfd

Please sign in to comment.