Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated metrics collecting through mqtt #306

Merged
merged 9 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 38 additions & 32 deletions docker/grafana/dashboards/home.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 1,
"id": 2,
"links": [],
"liveNow": false,
"panels": [
Expand All @@ -45,7 +45,7 @@
"axisSoftMin": 0,
"barAlignment": 0,
"drawStyle": "bars",
"fillOpacity": 33,
"fillOpacity": 100,
"gradientMode": "none",
"hideFrom": {
"legend": false,
Expand Down Expand Up @@ -88,13 +88,14 @@
"overrides": []
},
"gridPos": {
"h": 6,
"w": 12,
"h": 7,
"w": 15,
"x": 0,
"y": 0
},
"id": 2,
"interval": "1m",
"interval": "5m",
"maxDataPoints": 3000,
"options": {
"legend": {
"calcs": [],
Expand All @@ -114,25 +115,26 @@
},
"editorMode": "code",
"exemplar": true,
"expr": "delta(minio_bucket_usage_object_total{bucket=\"wis2box-incoming\"}[1h])",
"interval": "",
"legendFormat": "Received files per hour in wis2box-incoming",
"expr": "delta(wis2box_storage_incoming_total{}[$__interval])",
"interval": "5m",
"legendFormat": "New/updated files per hour in wis2box-incoming",
"range": true,
"refId": "A"
}
],
"title": "Received files per hour in wis2box-incoming",
"title": "New/updated files in wis2box-incoming storage",
"type": "timeseries"
},
{
"datasource": {
"type": "loki",
"uid": "P55348B596EBB51C3"
},
"description": "",
"gridPos": {
"h": 18,
"h": 21,
"w": 9,
"x": 12,
"x": 15,
"y": 0
},
"id": 8,
Expand All @@ -152,12 +154,15 @@
"type": "loki",
"uid": "wis2box-loki"
},
"expr": "{compose_service=\"wis2box\"}",
"editorMode": "builder",
"expr": "{compose_service=\"wis2box\"} |~ `ERROR`",
"hide": false,
"legendFormat": "",
"queryType": "range",
"refId": "B"
}
],
"title": "wis2box logging",
"title": "wis2box ERRORs",
"type": "logs"
},
{
Expand All @@ -176,7 +181,7 @@
"axisSoftMax": 5,
"barAlignment": 0,
"drawStyle": "bars",
"fillOpacity": 30,
"fillOpacity": 100,
"gradientMode": "none",
"hideFrom": {
"legend": false,
Expand Down Expand Up @@ -219,13 +224,13 @@
"overrides": []
},
"gridPos": {
"h": 6,
"w": 12,
"h": 7,
"w": 15,
"x": 0,
"y": 6
"y": 7
},
"id": 6,
"interval": "1m",
"interval": "5m",
"options": {
"legend": {
"calcs": [],
Expand All @@ -245,14 +250,14 @@
},
"editorMode": "code",
"exemplar": true,
"expr": "delta(minio_bucket_usage_object_total{bucket=\"wis2box-public\"}[1h])",
"interval": "",
"legendFormat": "New files per hour in wis2box-public",
"expr": "delta(wis2box_storage_public_total{}[$__interval])",
"interval": "5m",
"legendFormat": "New/updated files in wis2box-public",
"range": true,
"refId": "A"
}
],
"title": "New files per hour in wis2box-public",
"title": "New/updated files in wis2box-public storage",
"type": "timeseries"
},
{
Expand All @@ -271,7 +276,7 @@
"axisSoftMax": 5,
"barAlignment": 0,
"drawStyle": "bars",
"fillOpacity": 36,
"fillOpacity": 100,
"gradientMode": "none",
"hideFrom": {
"legend": false,
Expand Down Expand Up @@ -314,13 +319,13 @@
"overrides": []
},
"gridPos": {
"h": 6,
"w": 12,
"h": 7,
"w": 15,
"x": 0,
"y": 12
"y": 14
},
"id": 10,
"interval": "1m",
"interval": "5m",
"options": {
"legend": {
"calcs": [],
Expand All @@ -340,25 +345,26 @@
},
"editorMode": "code",
"exemplar": true,
"expr": "delta(mqtt_msg_count_total[1h])",
"interval": "",
"legendFormat": "mqtt messages published per hour",
"expr": "delta(wis2box_notify_total[$__interval])",
"interval": "5m",
"legendFormat": "WIS2.0 notifications",
"range": true,
"refId": "A"
}
],
"title": "mqtt messages published per hour",
"title": "Number of WIS2.0 notifications published by wis2box",
"type": "timeseries"
}
],
"refresh": "5s",
"schemaVersion": 36,
"style": "dark",
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-2d",
"from": "now-6h",
"to": "now"
},
"timepicker": {},
Expand Down
74 changes: 47 additions & 27 deletions docker/mqtt_metrics_collector/mqtt_metrics_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@
import paho.mqtt.client as mqtt
import random

from urllib.parse import urlparse

import sys
import json

from prometheus_client import start_http_server, Counter

Expand Down Expand Up @@ -57,7 +56,6 @@
def sub_connect(client, userdata, flags, rc, properties=None):
"""
function executed 'on_connect' for paho.mqtt.client
subscribes to origin/#

:param client: client-object associated to 'on_connect'
:param userdata: userdata
Expand All @@ -69,15 +67,26 @@ def sub_connect(client, userdata, flags, rc, properties=None):
"""

logger.info(f"on connection to subscribe: {mqtt.connack_string(rc)}")
for s in ["origin/#"]:
for s in ["wis2box/#", "wis2box-storage/#"]:
client.subscribe(s, qos=1)


mqtt_msg_counter = Counter('mqtt_msg_count',
'Nr of messages seen on MQTT')
mqtt_msg_topic_counter = Counter('mqtt_msg_count_topic',
'Nr of messages seen on MQTT, by topic',
["topic"])
notify_total = Counter('wis2box_notify_total',
'Total notifications sent by wis2box')
notify_topic_wsi_total = Counter('wis2box_notify_topic_wsi_total',
'Total notifications sent by wis2box, by topic and WSI', # noqa
["topic", "WSI"])

failure_total = Counter('wi2box_failure_total',
'Total failed actions reported by wis2box')
failure_descr_wsi_total = Counter('wis2box_failure_detail_total',
'Total failed actions sent by wis2box, by description and WSI', # noqa
["description", "WSI"])

storage_incoming_total = Counter('wis2box_storage_incoming_total',
'Total storage notifications received on incoming') # noqa
storage_public_total = Counter('wis2box_storage_public_total',
'Total storage notifications received on public') # noqa


def sub_mqtt_metrics(client, userdata, msg):
Expand All @@ -91,10 +100,27 @@ def sub_mqtt_metrics(client, userdata, msg):

:returns: `None`
"""
# m = json.loads(msg.payload.decode('utf-8'))
logger.info(f"Received message on topic ={msg.topic}")
mqtt_msg_topic_counter.labels(msg.topic).inc(1)
mqtt_msg_counter.inc(1)
m = json.loads(msg.payload.decode('utf-8'))
logger.info(f"Received message on topic={msg.topic}")

if str(msg.topic).startswith('wis2box/notifications'):
notify_topic_wsi_total.labels(
m['topic'], m['wigos_station_identifier']).inc(1)
notify_total.inc(1)

if str(msg.topic).startswith('wis2box/failure'):
descr = m['description']
wsi = 'none'
if 'wigos_station_identifier' in m:
wsi = m['wigos_station_identifier']
failure_descr_wsi_total.labels(descr, wsi).inc(1)
failure_total.inc(1)

if str(msg.topic).startswith('wis2box-storage'):
if str(m["Key"]).startswith('wis2box-incoming'):
storage_incoming_total.inc(1)
if str(m["Key"]).startswith('wis2box-public'):
storage_public_total.inc(1)


def gather_mqtt_metrics():
Expand All @@ -103,30 +129,24 @@ def gather_mqtt_metrics():

:returns: `None`
"""
# explicitly set the counter to 0 at the start
mqtt_msg_counter.inc(0)

broker_url = urlparse(os.environ.get('WIS2BOX_BROKER_PUBLIC', ''))
# connect to the internal broker
broker_host = os.environ.get('WIS2BOX_BROKER_HOST', '')
broker_username = os.environ.get('WIS2BOX_BROKER_USERNAME', '')
broker_password = os.environ.get('WIS2BOX_BROKER_PASSWORD', '')
broker_port = int(os.environ.get('WIS2BOX_BROKER_PORT', '1883'))

# generate a random clientId for the mqtt-session
r = random.Random()
client_id = f"mqtt_metrics_collector_{r.randint(1,1000):04d}"
try:
logger.info("setup connection")
logger.info(f"host={broker_url.hostname}, user={broker_url.username}")
logger.info(f"host={broker_host}, user={broker_username}")
client = mqtt.Client(client_id=client_id, protocol=mqtt.MQTTv5)
client.on_connect = sub_connect
client.on_message = sub_mqtt_metrics
client.username_pw_set(broker_url.username, broker_url.password)
_port = broker_url.port
if _port is None:
if broker_url.scheme == 'mqtts':
_port = 8883
else:
_port = 1883
if broker_url.scheme == 'mqtts':
client.tls_set(tls_version=2)
client.connect(broker_url.hostname, _port)
client.username_pw_set(broker_username, broker_password)
client.connect(broker_host, broker_port)
client.loop_forever()
except Exception as e:
logger.error(f"Failed to setup MQTT-client with error: {e}")
Expand Down
32 changes: 30 additions & 2 deletions wis2box/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
#
###############################################################################

import json
import logging
from pathlib import Path
from typing import Iterator, Union

from wis2box.api import upsert_collection_item
from wis2box.env import (STORAGE_INCOMING, STORAGE_PUBLIC,
STORAGE_SOURCE, BROKER_PUBLIC)
STORAGE_SOURCE, BROKER_PUBLIC,
BROKER_HOST, BROKER_USERNAME, BROKER_PASSWORD,
BROKER_PORT)
from wis2box.storage import put_data
from wis2box.topic_hierarchy import TopicHierarchy
from wis2box.plugin import load_plugin, PLUGINS
Expand Down Expand Up @@ -58,9 +61,27 @@ def __init__(self, defs: dict) -> None:
self.output_data = {}
self.discovery_metadata = {}

# load plugin for local broker
defs2 = {
'codepath': PLUGINS['pubsub']['mqtt']['plugin'],
'url': f"mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}", # noqa
'client_type': 'notify-publisher'
}
self.local_broker = load_plugin('pubsub', defs2)

# if discovery_metadata:
# self.setup_discovery_metadata(discovery_metadata)

def publish_failure_message(self, description, wsi=None):
message = {
'filepath': self.incoming_filepath,
'description': description
}
if wsi is not None:
message['wigos_station_identifier'] = wsi
# publish message
self.local_broker.pub('wis2box/failure', json.dumps(message))

def setup_discovery_metadata(self, discovery_metadata: dict) -> None:
"""
Import discovery metadata
Expand Down Expand Up @@ -131,7 +152,7 @@ def notify(self, identifier: str, storage_path: str,
geometry,
wigos_station_identifier)

# load plugin for broker
# load plugin for public broker
defs = {
'codepath': PLUGINS['pubsub']['mqtt']['plugin'],
'url': BROKER_PUBLIC,
Expand All @@ -143,6 +164,13 @@ def notify(self, identifier: str, storage_path: str,
broker.pub(topic, wis_message.dumps())
LOGGER.info(f'WISNotificationMessage published for {identifier}')

# publish message for internal monitoring
notify_msg = {
'topic': topic,
'wigos_station_identifier': wigos_station_identifier
}
self.local_broker.pub('wis2box/notifications', json.dumps(notify_msg))

LOGGER.debug('Pushing message to API')
upsert_collection_item('messages', wis_message.message)

Expand Down
Loading