-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnode_status_bot.py
158 lines (125 loc) · 3.91 KB
/
node_status_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/env python3
import argparse
import contextlib
import datetime
import json
import os
import requests
import select
import socket
import subprocess
import sys
import time
import _thread as thread
import threading
from time import sleep
TOKEN = os.environ.get("TOKEN")
CHANNEL_ID = os.environ.get("CHANNEL_ID")
POLLING_INTERVAL = int(os.environ.get("POLLING_INTERVAL", "60000")) # 1min
TIMEOUT = int(os.environ.get("TIMEOUT", "5000")) # 5secs
def get_node_list():
idx = 0
node_list = []
while True:
try:
node_list.append(get_node_from_env(idx))
except:
# Done parsing node list
return node_list
idx += 1
def get_node_from_env(idx):
name = os.environ[f"NODE_{idx}_NAME"]
host = os.environ[f"NODE_{idx}_HOST"]
port = int(os.environ.get(f"NODE_{idx}_PORT", "21338"))
return {"name": name, "host": host, "port": port}
# https://stackoverflow.com/a/64233946
def recv_timeout(sock, timeout_seconds):
sock.setblocking(0)
ready = select.select([sock], [], [], timeout_seconds)
if ready[0]:
return sock.makefile("rb").readline()
raise socket.timeout()
def check_node(node):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(TIMEOUT / 1000)
try:
s.connect((node["host"], node["port"]))
except Exception as error:
print("Failed to connect to node:", error)
return False
print("Sending syncStatus to node", node["name"])
data_bytes = send_sync_status_request(s)
node_synced = False
try:
print(data_bytes)
data = json.loads(data_bytes)
res = data["result"]
node_state = res["node_state"]
if node_state == "Synced":
node_synced = True
else:
# Bad
pass
except (KeyError, json.JSONDecodeError) as e:
# Bad
pass
return node_synced
def send_sync_status_request(s):
req = {"jsonrpc": "2.0", "id": "1", "method": "syncStatus", "params": None}
req = json.dumps(req)
req = req.encode("utf-8")
req += b"\n"
s.sendall(req)
data_bytes = recv_timeout(s, TIMEOUT / 1000)
return data_bytes
def send_message(text):
print("Sending message", text)
# Disable sending message for testing:
# return
token = TOKEN
chat_id = CHANNEL_ID
url = f"https://api.telegram.org/bot{token}/sendMessage"
params = {
"chat_id": chat_id,
"text": text,
}
resp = requests.get(url, params=params)
# Throw an exception if Telegram API fails
resp.raise_for_status()
def create_message(node, message):
icon = "✅" if node["synced"] else "❌"
name = node["name"]
host = node["host"]
return f"{icon} {name}({host}) {message}"
def main(args):
if TOKEN is None:
print("Mandatory environment variable TOKEN is missing")
return
if CHANNEL_ID is None:
print("Mandatory environment variable CHANNEL_ID is missing")
return
print("Getting node list")
nodes = get_node_list()
while True:
for node in nodes:
print(f"Checking node {node['name']}")
try:
synced = check_node(node)
except KeyboardInterrupt as error:
synced = False
old_synced = node.get("synced")
if old_synced == synced:
print("Same as before:", "synced" if synced else "not synced")
else:
node["synced"] = synced
if synced:
send_message(create_message(node, f"Synced"))
else:
send_message(create_message(node, f"Not synced :("))
time.sleep(POLLING_INTERVAL / 1000)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Send a notification to a telegram group when one of the specified nodes is down"
)
args = parser.parse_args()
main(args)