Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alert update to include location #1917

Merged
merged 6 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .cspell/custom-dictionary-workspace.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Fingerbot
firstparty
fline
FLOMASTA
Forestfire
foxess
futurerate
gecloud
Expand Down
232 changes: 232 additions & 0 deletions apps/predbat/alertfeed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
# -----------------------------------------------------------------------------
# Predbat Home Battery System
# Copyright Trefor Southwell 2024 - All Rights Reserved
# This application maybe used for personal use only and not for commercial use
# -----------------------------------------------------------------------------
# fmt off
# pylint: disable=consider-using-f-string
# pylint: disable=line-too-long
# pylint: disable=attribute-defined-outside-init

import requests
import re
from datetime import datetime, timedelta
from config import TIME_FORMAT, TIME_FORMAT_OCTOPUS
from utils import str2time, minutes_to_time, dp1, dp2, dp4
import xml.etree.ElementTree as etree


class Alertfeed:
def process_alerts(self):
"""
Process the alerts from the alert feed
"""

self.alerts = []
self.alert_active_keep = {}

alerts = self.get_arg("alerts", {})
if not alerts:
return
if not isinstance(alerts, dict):
self.log("Warn: Alerts must be a dictionary, ignoring")
return

latitude = self.get_state_wrapper("zone.home", attribute="latitude")
longitude = self.get_state_wrapper("zone.home", attribute="longitude")
if latitude and longitude:
self.log("Processing alerts for approx position latitude {} longitude {}".format(dp1(latitude), dp1(longitude)))

alert_url = alerts.get("url", "https://feeds.meteoalarm.org/feeds/meteoalarm-legacy-atom-united-kingdom")
area = alerts.get("area", "")
event = alerts.get("event", "")
severity = alerts.get("severity", "")
certainty = alerts.get("certainty", "")
urgency = alerts.get("urgency", "")
keep = alerts.get("keep", 100)

self.log("Info: Processing alerts from {}".format(alert_url))
alert_xml = self.download_alert_data(alert_url)
if alert_xml:
self.alerts = self.parse_alert_data(alert_xml)
self.alerts = self.filter_alerts(self.alerts, area, event, severity, certainty, urgency, latitude, longitude)
self.alert_active_keep = self.apply_alerts(self.alerts, keep)

def apply_alerts(self, alerts, keep):
"""
Apply the alerts to the active alert list
"""
alert_active_keep = {}
active_alert_text = ""
active_alert = False

if alerts:
for alert in alerts:
onset = alert.get("onset", None)
expires = alert.get("expires", None)
severity = alert.get("severity", "")
certainty = alert.get("certainty", "")
urgency = alert.get("urgency", "")
area = alert.get("areaDesc", "")

if onset and expires:
onset_minutes = int((onset - self.midnight_utc).total_seconds() / 60)
expires_minutes = int((expires - self.midnight_utc).total_seconds() / 60)
if expires_minutes >= self.minutes_now:
self.log("Info: Active alert: {} severity {} certainty {} urgency {} from {} to {} applying keep {}".format(alert.get("event"), severity, certainty, urgency, onset, expires, keep))
for minute in range(onset_minutes, expires_minutes):
if minute not in alert_active_keep:
alert_active_keep[minute] = keep
else:
alert_active_keep[minute] = max(alert_active_keep[minute], keep)
if minute == self.minutes_now:
active_alert_text = alert.get("event") + " until " + str(expires)
active_alert = True

alert_keep = alert_active_keep.get(self.minutes_now, 0)
alert_show = []
for alert in alerts:
item = {}
item["event"] = alert.get("event", "")
item["severity"] = alert.get("severity", "")
item["certainty"] = alert.get("certainty", "")
item["urgency"] = alert.get("urgency", "")
item["area"] = alert.get("areaDesc", "")
item["onset"] = str(alert.get("onset", ""))
item["expires"] = str(alert.get("expires", ""))
item["title"] = alert.get("title", "")
item["status"] = alert.get("status", "")
alert_show.append(item)
self.dashboard_item(self.prefix + ".alerts", state=active_alert_text, attributes={"friendly_name": "Weather alerts", "icon": "mdi:alert-outline", "keep": alert_keep, "alerts": alert_show})

return alert_active_keep

def is_point_in_polygon(self, lat, lon, polygon):
"""
Determines if a given point is inside a polygon.

Parameters:
lat (float): Latitude of the point.
lon (float): Longitude of the point.
polygon (list of tuples): List of (latitude, longitude) tuples defining the polygon.

Returns:
bool: True if the point is inside the polygon, False otherwise.
"""
num_vertices = len(polygon)
inside = False

# Loop through each edge of the polygon
for i in range(num_vertices):
lat1, lon1 = polygon[i]
lat2, lon2 = polygon[(i + 1) % num_vertices]

# Check if the point is on the boundary
if (lat == lat1 and lon == lon1) or (lat == lat2 and lon == lon2):
return True

# Check if the edge crosses the ray
if ((lon > lon1) != (lon > lon2)) and (lat < (lat2 - lat1) * (lon - lon1) / (lon2 - lon1) + lat1):
inside = not inside

return inside

def filter_alerts(self, alerts, area=None, event=None, severity=None, certainty=None, urgency=None, latitude=None, longitude=None):
# Filter alerts by area, event, severity, certainty, and urgency
result = []
for alert in alerts:
if area:
areaDesc = alert.get("areaDesc", [])
areas = areaDesc.split("|")
match = False
for check_area in areas:
if area and re.search(area.lower(), check_area.lower()):
match = True
if not match:
continue
if event and not re.search(event.lower(), alert.get("event", "").lower()):
continue
if severity and not re.search(severity.lower(), alert.get("severity", "").lower()):
continue
if certainty and not re.search(certainty.lower(), alert.get("certainty", "").lower()):
continue
if urgency and not re.search(urgency.lower(), alert.get("urgency", "").lower()):
continue

if latitude and longitude:
polygon_text = alert.get("polygon", "")
polygon = []

# Polygon is a list of lat/lon pairs
if polygon_text:
polygon_arr = polygon_text.split()
for point in polygon_arr:
try:
lat, lon = point.split(",")
polygon.append((float(lat), float(lon)))
except (ValueError, TypeError):
pass

# Check if the alert is relevant to our location
if polygon:
# Check if our location is within the polygon
if not self.is_point_in_polygon(latitude, longitude, polygon):
continue

result.append(alert)
return result

def download_alert_data(self, url):
"""
Download octopus free session data directly from a URL
"""
# Check the cache first
now = datetime.now()
if url in self.alert_cache:
stamp = self.alert_cache[url]["stamp"]
pdata = self.alert_cache[url]["data"]
age = now - stamp
if age.seconds < (30 * 60):
self.log("Return cached octopus data for {} age {} minutes".format(url, dp1(age.seconds / 60)))
return pdata

r = requests.get(url)
if r.status_code not in [200, 201]:
self.log("Warn: Error downloading Octopus data from URL {}, code {}".format(url, r.status_code))
self.record_status("Warn: Error downloading Octopus free session data", debug=url, had_errors=True)
return None

# Return new data
self.alert_cache[url] = {}
self.alert_cache[url]["stamp"] = now
self.alert_cache[url]["data"] = r.text
return r.text

def parse_alert_data(self, xml):
"""
Parse the alert data from the XML
"""
namespace = "{http://www.w3.org/2005/Atom}"
namespace2 = "{urn:oasis:names:tc:emergency:cap:1.2}"
alerts = []
root = None
try:
root = etree.fromstring(xml)
except Exception as e:
self.log("Warn: Failed to extract alerts from xml data exception: {}".format(e))

if root:
for entry in root:
alert = {}
if entry.tag == f"{namespace}entry":
for child in entry:
tag_name = child.tag.replace(namespace, "").replace(namespace2, "")
tag_value = child.text
if tag_name in ["effective", "expires", "onset", "sent", "published", "updated"]:
try:
tag_value = str2time(tag_value)
except (ValueError, TypeError):
tag_value = None
alert[tag_name] = tag_value
alerts.append(alert)
return alerts
9 changes: 4 additions & 5 deletions apps/predbat/config/apps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -492,14 +492,13 @@ pred_bat:
#futurerate_peak_premium_import: 14
#futurerate_peak_premium_export: 6.5

# Alert feeds - customise to your region and desired risk/events
# Alert feeds - customise to your country and the alert types, severity and keep value
#alerts:
# url: "https://feeds.meteoalarm.org/feeds/meteoalarm-legacy-atom-united-kingdom"
# area: "North West England"
# event: "Amber|Yellow|Orange|Red"
# event: "(Amber|Yellow|Orange|Red).*(Wind|Snow|Fog|Rain|Thunderstorm|Avalanche|Frost|Heat|Coastal event|Flood|Forestfire|Ice|Low temperature|Storm|Tornado|Tsunami|Volcano|Wildfire)"
# severity: "Moderate|Severe|Extreme"
# certainty: "Possible|Likely|Expected"
# keep: 100
# certainty: "Likely|Expected"
# keep: 40

# Watch list, a list of sensors to watch for changes and then update the plan if they change
# This is useful for things like the Octopus Intelligent Slot sensor so that the plan update as soon as you plugin in
Expand Down
1 change: 1 addition & 0 deletions apps/predbat/predbat.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ def reset(self):
self.battery_temperature_prediction = {}
self.alerts = []
self.alert_active_keep = {}
self.alert_cache = {}

self.config_root = "./"
for root in CONFIG_ROOTS:
Expand Down
26 changes: 25 additions & 1 deletion apps/predbat/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6992,10 +6992,16 @@ def test_alert_feed(my_predbat):
Test the alert feed
"""
failed = 0
ha = my_predbat.ha_interface
today = datetime.now().strftime("%Y-%m-%d")
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
tomorrow = (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d")

birmingham = [52.4823, -1.8900]
bristol = [51.4545, -2.5879]
manchester = [53.4808, -2.2426]
fife = [56.2082, -3.1495]

alert_data = f"""<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns="http://www.w3.org/2005/Atom" xmlns:cap="urn:oasis:names:tc:emergency:cap:1.2">
<link href="https://pubsubhubbub.appspot.com/" rel="hub"/>
Expand Down Expand Up @@ -7088,13 +7094,25 @@ def test_alert_feed(my_predbat):
failed = 1
return failed

filter = my_predbat.filter_alerts(result, latitude=birmingham[0], longitude=birmingham[1])
if len(filter) != 0:
print("ERROR: Expecting 0 alert for Birmingham got {}".format(len(filter)))
failed = 1
return failed

filter = my_predbat.filter_alerts(result, latitude=fife[0], longitude=fife[1])
if len(filter) != 1:
print("ERROR: Expecting 1 alert for Fife got {}".format(len(filter)))
failed = 1
return failed

filter = my_predbat.filter_alerts(result, area="Grampian", severity="Moderate|Severe", certainty="Likely")
if len(filter) != 1:
print("ERROR: Expecting 1 alert for Grampian got {}".format(len(filter)))
failed = 1
return failed

filter = my_predbat.filter_alerts(result, event="Yellow|Amber")
filter = my_predbat.filter_alerts(result, event="(Amber|Yellow|Orange|Red).*(Wind|Snow|Fog|Thunderstorm|Avalanche|Frost|Heat|Coastal event|Flood|Forestfire|Ice|Low temperature|Storm|Tornado|Tsunami|Volcano|Wildfire)")
if len(filter) != 2:
print("ERROR: Expecting 2 alerts for Yellow|Amber but got {}".format(len(filter)))
failed = 1
Expand Down Expand Up @@ -7524,6 +7542,12 @@ def test_alert_feed(my_predbat):
print("ERROR: Expecting show should be {} got {}".format(expect_show, show))
failed = 1

alert_text = ha.get_state(my_predbat.prefix + ".alerts")
expect_text = "Yellow wind warning until " + today + " 23:59:59+00:00"
if alert_text != expect_text:
print("ERROR: Expecting alert text to be '{}' got '{}'".format(expect_text, alert_text))
failed = 1

return failed


Expand Down
Loading