diff --git a/docker/grafana/dashboards/home.json b/docker/grafana/dashboards/home.json index f5aa8904..5ed86af8 100644 --- a/docker/grafana/dashboards/home.json +++ b/docker/grafana/dashboards/home.json @@ -24,7 +24,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, - "id": 1, + "id": 2, "links": [], "liveNow": false, "panels": [ @@ -45,7 +45,7 @@ "axisSoftMin": 0, "barAlignment": 0, "drawStyle": "bars", - "fillOpacity": 33, + "fillOpacity": 100, "gradientMode": "none", "hideFrom": { "legend": false, @@ -88,13 +88,14 @@ "overrides": [] }, "gridPos": { - "h": 6, - "w": 12, + "h": 7, + "w": 15, "x": 0, "y": 0 }, "id": 2, - "interval": "1m", + "interval": "5m", + "maxDataPoints": 3000, "options": { "legend": { "calcs": [], @@ -114,14 +115,14 @@ }, "editorMode": "code", "exemplar": true, - "expr": "delta(minio_bucket_usage_object_total{bucket=\"wis2box-incoming\"}[1h])", - "interval": "", - "legendFormat": "Received files per hour in wis2box-incoming", + "expr": "delta(wis2box_storage_incoming_total{}[$__interval])", + "interval": "5m", + "legendFormat": "New/updated files per hour in wis2box-incoming", "range": true, "refId": "A" } ], - "title": "Received files per hour in wis2box-incoming", + "title": "New/updated files in wis2box-incoming storage", "type": "timeseries" }, { @@ -129,10 +130,11 @@ "type": "loki", "uid": "P55348B596EBB51C3" }, + "description": "", "gridPos": { - "h": 18, + "h": 21, "w": 9, - "x": 12, + "x": 15, "y": 0 }, "id": 8, @@ -152,12 +154,15 @@ "type": "loki", "uid": "wis2box-loki" }, - "expr": "{compose_service=\"wis2box\"}", + "editorMode": "builder", + "expr": "{compose_service=\"wis2box\"} |~ `ERROR`", "hide": false, + "legendFormat": "", + "queryType": "range", "refId": "B" } ], - "title": "wis2box logging", + "title": "wis2box ERRORs", "type": "logs" }, { @@ -176,7 +181,7 @@ "axisSoftMax": 5, "barAlignment": 0, "drawStyle": "bars", - "fillOpacity": 30, + "fillOpacity": 100, "gradientMode": "none", "hideFrom": { "legend": false, @@ -219,13 +224,13 @@ "overrides": [] }, "gridPos": { - "h": 6, - "w": 12, + "h": 7, + "w": 15, "x": 0, - "y": 6 + "y": 7 }, "id": 6, - "interval": "1m", + "interval": "5m", "options": { "legend": { "calcs": [], @@ -245,14 +250,14 @@ }, "editorMode": "code", "exemplar": true, - "expr": "delta(minio_bucket_usage_object_total{bucket=\"wis2box-public\"}[1h])", - "interval": "", - "legendFormat": "New files per hour in wis2box-public", + "expr": "delta(wis2box_storage_public_total{}[$__interval])", + "interval": "5m", + "legendFormat": "New/updated files in wis2box-public", "range": true, "refId": "A" } ], - "title": "New files per hour in wis2box-public", + "title": "New/updated files in wis2box-public storage", "type": "timeseries" }, { @@ -271,7 +276,7 @@ "axisSoftMax": 5, "barAlignment": 0, "drawStyle": "bars", - "fillOpacity": 36, + "fillOpacity": 100, "gradientMode": "none", "hideFrom": { "legend": false, @@ -314,13 +319,13 @@ "overrides": [] }, "gridPos": { - "h": 6, - "w": 12, + "h": 7, + "w": 15, "x": 0, - "y": 12 + "y": 14 }, "id": 10, - "interval": "1m", + "interval": "5m", "options": { "legend": { "calcs": [], @@ -340,17 +345,18 @@ }, "editorMode": "code", "exemplar": true, - "expr": "delta(mqtt_msg_count_total[1h])", - "interval": "", - "legendFormat": "mqtt messages published per hour", + "expr": "delta(wis2box_notify_total[$__interval])", + "interval": "5m", + "legendFormat": "WIS2.0 notifications", "range": true, "refId": "A" } ], - "title": "mqtt messages published per hour", + "title": "Number of WIS2.0 notifications published by wis2box", "type": "timeseries" } ], + "refresh": "5s", "schemaVersion": 36, "style": "dark", "tags": [], @@ -358,7 +364,7 @@ "list": [] }, "time": { - "from": "now-2d", + "from": "now-6h", "to": "now" }, "timepicker": {}, diff --git a/docker/mqtt_metrics_collector/mqtt_metrics_collector.py b/docker/mqtt_metrics_collector/mqtt_metrics_collector.py index 3f084f00..b8d86f12 100644 --- a/docker/mqtt_metrics_collector/mqtt_metrics_collector.py +++ b/docker/mqtt_metrics_collector/mqtt_metrics_collector.py @@ -25,9 +25,8 @@ import paho.mqtt.client as mqtt import random -from urllib.parse import urlparse - import sys +import json from prometheus_client import start_http_server, Counter @@ -57,7 +56,6 @@ def sub_connect(client, userdata, flags, rc, properties=None): """ function executed 'on_connect' for paho.mqtt.client - subscribes to origin/# :param client: client-object associated to 'on_connect' :param userdata: userdata @@ -69,15 +67,26 @@ def sub_connect(client, userdata, flags, rc, properties=None): """ logger.info(f"on connection to subscribe: {mqtt.connack_string(rc)}") - for s in ["origin/#"]: + for s in ["wis2box/#", "wis2box-storage/#"]: client.subscribe(s, qos=1) -mqtt_msg_counter = Counter('mqtt_msg_count', - 'Nr of messages seen on MQTT') -mqtt_msg_topic_counter = Counter('mqtt_msg_count_topic', - 'Nr of messages seen on MQTT, by topic', - ["topic"]) +notify_total = Counter('wis2box_notify_total', + 'Total notifications sent by wis2box') +notify_topic_wsi_total = Counter('wis2box_notify_topic_wsi_total', + 'Total notifications sent by wis2box, by topic and WSI', # noqa + ["topic", "WSI"]) + +failure_total = Counter('wi2box_failure_total', + 'Total failed actions reported by wis2box') +failure_descr_wsi_total = Counter('wis2box_failure_detail_total', + 'Total failed actions sent by wis2box, by description and WSI', # noqa + ["description", "WSI"]) + +storage_incoming_total = Counter('wis2box_storage_incoming_total', + 'Total storage notifications received on incoming') # noqa +storage_public_total = Counter('wis2box_storage_public_total', + 'Total storage notifications received on public') # noqa def sub_mqtt_metrics(client, userdata, msg): @@ -91,10 +100,27 @@ def sub_mqtt_metrics(client, userdata, msg): :returns: `None` """ - # m = json.loads(msg.payload.decode('utf-8')) - logger.info(f"Received message on topic ={msg.topic}") - mqtt_msg_topic_counter.labels(msg.topic).inc(1) - mqtt_msg_counter.inc(1) + m = json.loads(msg.payload.decode('utf-8')) + logger.info(f"Received message on topic={msg.topic}") + + if str(msg.topic).startswith('wis2box/notifications'): + notify_topic_wsi_total.labels( + m['topic'], m['wigos_station_identifier']).inc(1) + notify_total.inc(1) + + if str(msg.topic).startswith('wis2box/failure'): + descr = m['description'] + wsi = 'none' + if 'wigos_station_identifier' in m: + wsi = m['wigos_station_identifier'] + failure_descr_wsi_total.labels(descr, wsi).inc(1) + failure_total.inc(1) + + if str(msg.topic).startswith('wis2box-storage'): + if str(m["Key"]).startswith('wis2box-incoming'): + storage_incoming_total.inc(1) + if str(m["Key"]).startswith('wis2box-public'): + storage_public_total.inc(1) def gather_mqtt_metrics(): @@ -103,30 +129,24 @@ def gather_mqtt_metrics(): :returns: `None` """ - # explicitly set the counter to 0 at the start - mqtt_msg_counter.inc(0) - broker_url = urlparse(os.environ.get('WIS2BOX_BROKER_PUBLIC', '')) + # connect to the internal broker + broker_host = os.environ.get('WIS2BOX_BROKER_HOST', '') + broker_username = os.environ.get('WIS2BOX_BROKER_USERNAME', '') + broker_password = os.environ.get('WIS2BOX_BROKER_PASSWORD', '') + broker_port = int(os.environ.get('WIS2BOX_BROKER_PORT', '1883')) # generate a random clientId for the mqtt-session r = random.Random() client_id = f"mqtt_metrics_collector_{r.randint(1,1000):04d}" try: logger.info("setup connection") - logger.info(f"host={broker_url.hostname}, user={broker_url.username}") + logger.info(f"host={broker_host}, user={broker_username}") client = mqtt.Client(client_id=client_id, protocol=mqtt.MQTTv5) client.on_connect = sub_connect client.on_message = sub_mqtt_metrics - client.username_pw_set(broker_url.username, broker_url.password) - _port = broker_url.port - if _port is None: - if broker_url.scheme == 'mqtts': - _port = 8883 - else: - _port = 1883 - if broker_url.scheme == 'mqtts': - client.tls_set(tls_version=2) - client.connect(broker_url.hostname, _port) + client.username_pw_set(broker_username, broker_password) + client.connect(broker_host, broker_port) client.loop_forever() except Exception as e: logger.error(f"Failed to setup MQTT-client with error: {e}") diff --git a/wis2box/data/base.py b/wis2box/data/base.py index a1f3573b..1a84e143 100644 --- a/wis2box/data/base.py +++ b/wis2box/data/base.py @@ -19,13 +19,16 @@ # ############################################################################### +import json import logging from pathlib import Path from typing import Iterator, Union from wis2box.api import upsert_collection_item from wis2box.env import (STORAGE_INCOMING, STORAGE_PUBLIC, - STORAGE_SOURCE, BROKER_PUBLIC) + STORAGE_SOURCE, BROKER_PUBLIC, + BROKER_HOST, BROKER_USERNAME, BROKER_PASSWORD, + BROKER_PORT) from wis2box.storage import put_data from wis2box.topic_hierarchy import TopicHierarchy from wis2box.plugin import load_plugin, PLUGINS @@ -58,9 +61,27 @@ def __init__(self, defs: dict) -> None: self.output_data = {} self.discovery_metadata = {} + # load plugin for local broker + defs2 = { + 'codepath': PLUGINS['pubsub']['mqtt']['plugin'], + 'url': f"mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}", # noqa + 'client_type': 'notify-publisher' + } + self.local_broker = load_plugin('pubsub', defs2) + # if discovery_metadata: # self.setup_discovery_metadata(discovery_metadata) + def publish_failure_message(self, description, wsi=None): + message = { + 'filepath': self.incoming_filepath, + 'description': description + } + if wsi is not None: + message['wigos_station_identifier'] = wsi + # publish message + self.local_broker.pub('wis2box/failure', json.dumps(message)) + def setup_discovery_metadata(self, discovery_metadata: dict) -> None: """ Import discovery metadata @@ -131,7 +152,7 @@ def notify(self, identifier: str, storage_path: str, geometry, wigos_station_identifier) - # load plugin for broker + # load plugin for public broker defs = { 'codepath': PLUGINS['pubsub']['mqtt']['plugin'], 'url': BROKER_PUBLIC, @@ -143,6 +164,13 @@ def notify(self, identifier: str, storage_path: str, broker.pub(topic, wis_message.dumps()) LOGGER.info(f'WISNotificationMessage published for {identifier}') + # publish message for internal monitoring + notify_msg = { + 'topic': topic, + 'wigos_station_identifier': wigos_station_identifier + } + self.local_broker.pub('wis2box/notifications', json.dumps(notify_msg)) + LOGGER.debug('Pushing message to API') upsert_collection_item('messages', wis_message.message) diff --git a/wis2box/data/bufr4.py b/wis2box/data/bufr4.py index 6079857f..76a69191 100644 --- a/wis2box/data/bufr4.py +++ b/wis2box/data/bufr4.py @@ -169,6 +169,9 @@ def transform_subset(self, subset: int, subset_out: int) -> None: data_date = parser.get_time() except Exception as err: LOGGER.warning(err) + self.publish_failure_message( + description="Failed to parse time", + wsi=temp_wsi) return del parser @@ -178,6 +181,9 @@ def transform_subset(self, subset: int, subset_out: int) -> None: if wsi is None: msg = f'Failed to publish, wsi: {temp_wsi}, tsi: {temp_tsi}' LOGGER.error(msg) + self.publish_failure_message( + description="Invalid station", + wsi=temp_wsi) return LOGGER.debug('Copying wsi to BUFR') diff --git a/wis2box/handler.py b/wis2box/handler.py index 9dad3b1c..3465b072 100644 --- a/wis2box/handler.py +++ b/wis2box/handler.py @@ -27,6 +27,12 @@ from wis2box.storage import get_data from wis2box.topic_hierarchy import validate_and_load +from wis2box.plugin import load_plugin +from wis2box.plugin import PLUGINS + +from wis2box.env import (BROKER_HOST, BROKER_USERNAME, + BROKER_PASSWORD, BROKER_PORT) + LOGGER = logging.getLogger(__name__) @@ -53,21 +59,45 @@ def __init__(self, filepath: str, topic_hierarchy: str = None): th = self.filepath fuzzy = True + # handler uses local broker to publish success/failure messages + defs = { + 'codepath': PLUGINS['pubsub']['mqtt']['plugin'], + 'url': f"mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}", # noqa + 'client_type': 'handler-publisher' + } + self.local_broker = load_plugin('pubsub', defs) + try: self.topic_hierarchy, self.plugins = validate_and_load( th, self.filetype, fuzzy=fuzzy) except Exception as err: msg = f'Topic Hierarchy validation error: {err}' LOGGER.error(msg) + self.publish_failure_message( + description='Topic hierarchy validation' + ) raise ValueError(msg) + def publish_failure_message(self, description, plugin=None): + message = { + 'filepath': self.filepath, + 'description': description + } + if plugin is not None: + cl = plugin.__class__ + message['plugin'] = f"{cl.__module__ }.{cl.__name__}" + # publish message + self.local_broker.pub('wis2box/failure', json.dumps(message)) + def handle(self) -> bool: for plugin in self.plugins: - if not plugin.accept_file(self.filepath): - LOGGER.info(f'file {self.filepath} not accepted') + msg = f'Filepath not accepted: {self.filepath}' + LOGGER.warning(msg) + self.publish_failure_message( + description="filepath not accepted", + plugin=plugin) continue - try: if self.is_http: plugin.transform( @@ -77,15 +107,20 @@ def handle(self) -> bool: else: plugin.transform(self.filepath) except Exception as err: - msg = f'file {self.filepath} failed to transform: {err}' + msg = f'Failed to transform file {self.filepath} : {err}' LOGGER.warning(msg) + self.publish_failure_message( + description="failed to transform file", + plugin=plugin) return False - try: plugin.publish() except Exception as err: - msg = f'file {self.filepath} failed to publish: {err}' + msg = f'Failed to publish file {self.filepath}: {err}' LOGGER.warning(msg) + self.publish_failure_message( + decription="Failed to publish file to api-backend", + plugin=plugin) return False return True