-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata2mqtt.py
309 lines (271 loc) · 12.2 KB
/
data2mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
"""importing dependencies"""
import os
import sys
import json
import csv
import time
import argparse
from io import StringIO
from urllib.parse import urlparse
import yaml
import requests
import xmltodict
import paho.mqtt.client as mqtt
from logger import log
def publish_to_mqtt(client, topic, value, prefix=""):
"""publishing parsed data to MQTT server"""
full_topic = f"{prefix}.{topic}" if prefix else topic
log(f"Publishing to MQTT: Topic: {full_topic}, Value: {value}", 4)
client.publish(full_topic, value)
def process_json(client, json_obj, parent_key="", prefix=""):
"""processing JSON data"""
if isinstance(json_obj, dict):
for key, value in json_obj.items():
full_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(value, dict):
process_json(client, value, full_key, prefix)
else:
publish_to_mqtt(client, full_key, str(value), prefix)
else:
log("The JSON object is not structured as expected.", 1)
def process_xml(client, xml_data, prefix=""):
"""processing XML data"""
try:
json_data = xmltodict.parse(xml_data)
process_json(client, json_data, prefix=prefix)
except Exception as e:
log(f"Error processing XML data: {e}", 1)
def process_yaml(client, yaml_data, prefix=""):
"""processing YAML data"""
try:
json_data = yaml.safe_load(yaml_data)
process_json(client, json_data, prefix=prefix)
except yaml.YAMLError as e:
log(f"Error processing YAML data: {e}", 1)
def process_csv(client, csv_data, prefix=""):
"""processing CSV data"""
try:
csv_reader = csv.DictReader(StringIO(csv_data))
for row in csv_reader:
for key, value in row.items():
publish_to_mqtt(client, key, value, prefix)
except Exception as e:
log(f"Error processing CSV data: {e}", 1)
def detect_and_process_data(client, data, content_type, prefix=""):
"""check if data is correctly formatted before publishing it"""
log(f"trying to parse data: {content_type}",15)
if content_type == 'application/json' or content_type == 'text/json':
try:
json_data = json.loads(data)
process_json(client, json_data, prefix=prefix)
except json.JSONDecodeError as e:
log(f"Error processing JSON data: {e}", 1)
elif content_type == 'application/xml' or content_type == 'text/xml':
process_xml(client, data, prefix=prefix)
elif content_type == 'application/x-yaml' or content_type == 'text/yaml':
process_yaml(client, data, prefix=prefix)
elif content_type == 'text/csv' or content_type == 'application/csv':
process_csv(client, data, prefix=prefix)
else:
log("Unable to determine or process data format.", 1)
def fetch_and_publish_data(client, url, auth, verify, prefix):
"""request data from URL or local file"""
log("Fetching data ...", 15)
parsed_url = urlparse(url)
if parsed_url.scheme == 'file':
log("this is a local file data source",15)
# Handle local file
file_path = parsed_url.path
if not os.path.exists(file_path):
log(f"Error: Local file {file_path} not found.", 1)
return
try:
with open(file_path, 'r') as file:
data = file.read()
content_type = guess_content_type(file_path)
log(f"Content type detected as: {content_type}",15)
detect_and_process_data(client, data, content_type, prefix)
except Exception as e:
log(f"Error reading local file {file_path}: {e}", 1)
else:
# Handle HTTP/HTTPS
log(f"this is a remote data source ({url})",15)
try:
response = requests.get(url, auth=auth, verify=verify)
response.raise_for_status() # Raise an exception for HTTP errors
content_type = response.headers.get('Content-Type', '').lower()
data = response.text
log(f"data received: \n {data}",25)
detect_and_process_data(client, data, content_type, prefix)
except requests.exceptions.RequestException as e:
log(f"Error fetching data from the URL: {e}", 1)
def guess_content_type(file_path):
"""Guess the content type based on file extension."""
_, ext = os.path.splitext(file_path)
if ext in ['.json','.jsn']:
return 'application/json'
if ext in ['.xml']:
return 'application/xml'
if ext in ['.yaml', '.yml']:
return 'application/x-yaml'
if ext in ['.csv']:
return 'text/csv'
return 'text/plain'
def load_config_file(config_file):
"""load configuration file"""
try:
with open(config_file, 'r') as file:
config_data = yaml.safe_load(file)
return config_data.get('configurations', [])
except FileNotFoundError:
log(f"Error: Configuration file {config_file} not found.", 1)
sys.exit(1)
except yaml.YAMLError as e:
log(f"Error: Failed to parse YAML configuration file: {e}", 1)
sys.exit(1)
def get_config_by_name(configurations, name):
"""access a specific configuration set within the configuration file"""
for config in configurations:
if config.get('name') == name:
return config
log(f"Error: Configuration with name '{name}' not found in the configuration file.", 1)
sys.exit(1)
def merge_configs(base_config, override_config):
"""merge configurations if changes need to be incorporated"""
return {**base_config, **{k: v for k, v in override_config.items() if v is not None}}
def parse_mqtt_host_and_port(mqtt_host, mqtt_port):
"""Parses the MQTT host and port if specified as part of the host string."""
if ':' in mqtt_host:
host, port = mqtt_host.rsplit(':', 1)
try:
return host, int(port)
except ValueError:
log(f"Error: Invalid port number '{port}' in MQTT host.", 1)
sys.exit(1)
else:
return mqtt_host, mqtt_port # Default MQTT port
def process_config(client, config, config_name):
"""process the configuration sets one by one"""
# Log the configuration name at Loglevel 2 or higher
log(f"Processing config: {config_name}", 2)
# Log the URL at Loglevel 3 or higher
log(f"Fetching data from URL: {config['url']}", 3)
# Log all defined parameters at Loglevel 10
log(f"Defined parameters: {config}", 10)
# Certificate verification configuration
verify = config.get('verify', 'true')
if verify.lower() == "false":
log("verify disabled",15)
verify = False
elif os.path.isfile(verify):
log(f"verify using custom CA file: {verify}",15)
elif verify.lower() != "true":
log(f"Error: The path provided for --verify does not exist or is not a file: {verify}", 1)
return
# Set up URL authentication if credentials are provided
auth = None
if config.get('username') and config.get('password'):
auth = (config['username'], config['password'])
# Parse MQTT host and port
mqtt_server = config.get('mqtt_server')
mqtt_port = config.get('mqtt_port')
mqtt_host, mqtt_hostport = parse_mqtt_host_and_port(mqtt_server, mqtt_port)
log(f"Host {mqtt_server}:{mqtt_port}",4)
# Connect to the MQTT server
try:
client.connect(mqtt_host, mqtt_hostport, 60)
except Exception as e:
log(f"Error connecting to the MQTT server: {e}", 1)
return
# Fetch and publish data
try:
fetch_and_publish_data(client, config['url'], auth, verify, config.get('prefix', ''))
except Exception as e:
log(f"Error during data fetch and publish: {e}", 1)
def main():
"""The main function"""
# Parse command line arguments
parser = argparse.ArgumentParser(
description="Fetch data from a URL or a local file and publish it via MQTT.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("--configfile", type=str, help="Path to YAML configuration file.")
parser.add_argument("--config", type=str, help="Name of the configuration set to use, \
or 'all' to process all configurations. Can also be a comma-separated list of \
configuration names.")
parser.add_argument("url", type=str, nargs='?', help="The URL or file path from which \
to fetch data (supports HTTP/HTTPS and file://).")
parser.add_argument("mqtt_server", type=str, nargs='?', help="The IP address or hostname \
of the MQTT server, optionally with port (e.g., '192.168.1.100:1884').", \
default="127.0.0.1")
parser.add_argument("mqtt_port", type=int, nargs='?', help="The port of the MQTT server \
(if not specified as part of the MQTT host).")
parser.add_argument("--prefix", type=str, help="Optional prefix for all MQTT topics.")
parser.add_argument("--username", type=str, help="Username for URL authentication \
(optional).")
parser.add_argument("--password", type=str, help="Password for URL authentication \
(optional).")
parser.add_argument("--mqttuser", type=str, help="Username for MQTT authentication \
(optional).")
parser.add_argument("--mqttpassword", type=str, help="Password for MQTT authentication \
(optional).")
parser.add_argument("--verify", type=str, help="SSL certificate verification for HTTPS \
requests ('false' to disable, or path to a custom CA bundle).")
parser.add_argument("--interval", type=int, help="Interval in seconds to repeatedly fetch \
data from the URL or file (optional).")
args = parser.parse_args()
# Load configurations from a file if provided
config_sets = []
if args.configfile:
configurations = load_config_file(args.configfile)
# Default to --config="all" if --config is not specified
if not args.config:
args.config = "all"
if args.config.lower() == "all":
config_sets = configurations
else:
config_names = [name.strip() for name in args.config.split(",")]
config_sets = [get_config_by_name(configurations, name) for name in config_names]
else:
# If no configfile is provided, create a single configuration from command-line arguments
config_sets = [vars(args)]
config_name = "commandline"
# Dictionary to store the next execution time for each config
next_run_times = {}
# Initialize next_run_times with the current time for all configs
current_time = time.time()
for config in config_sets:
# check if a "name" key is found in the configuration
if 'name' not in config:
log(f"Error: Missing 'name' key in one of the configuration sets: {config}", 1)
continue # skipping this configuration set
interval = config.get('interval')
if interval:
next_run_times[config['name']] = current_time + interval
else:
next_run_times[config['name']] = current_time # Execute immediately if no interval
while True:
current_time = time.time()
for config in config_sets:
config_name = config.get('name', "commandline")
next_run_time = next_run_times.get(config_name)
if next_run_time and current_time >= next_run_time:
# Execute the config
final_config = merge_configs(config, vars(args))
mqtt_version = config.get('mqtt_version', 'v3.1.1') # Default to v3.1.1
log(f"Using MQTT version '{mqtt_version}'", 15)
if mqtt_version == "v5":
client = mqtt.Client(protocol=mqtt.MQTTv5)
else:
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.username_pw_set(final_config.get('mqttuser', ''), \
final_config.get('mqttpassword', ''))
process_config(client, final_config, config_name)
# Update the next run time
interval = final_config.get('interval')
if interval:
next_run_times[config_name] = current_time + interval
# Sleep for a short period to avoid busy waiting
time.sleep(1)
if __name__ == "__main__":
main()