Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance sensor class structure #84

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ repos:
rev: "v1.4.1"
hooks:
- id: mypy
additional_dependencies: [homeassistant-stubs, voluptuous-stubs, types-python-dateutil]
additional_dependencies: [homeassistant-stubs, voluptuous-stubs, types-python-dateutil, types-PyYAML]
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.0.278
Expand Down
121 changes: 79 additions & 42 deletions custom_components/feedparser/sensor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Feedparser sensor."""
from __future__ import annotations

import email.utils
import logging
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING

import feedparser # type: ignore[import]
Expand Down Expand Up @@ -32,16 +33,18 @@
CONF_EXCLUSIONS = "exclusions"
CONF_SHOW_TOPN = "show_topn"

DEFAULT_DATE_FORMAT = "%a, %b %d %I:%M %p"
DEFAULT_SCAN_INTERVAL = timedelta(hours=1)
DEFAULT_THUMBNAIL = "https://www.home-assistant.io/images/favicon-192x192-full.png"
DEFAULT_TOPN = 9999

PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_NAME): cv.string,
vol.Required(CONF_FEED_URL): cv.string,
vol.Required(CONF_DATE_FORMAT, default="%a, %b %d %I:%M %p"): cv.string,
vol.Required(CONF_DATE_FORMAT, default=DEFAULT_DATE_FORMAT): cv.string,
vol.Optional(CONF_LOCAL_TIME, default=False): cv.boolean,
vol.Optional(CONF_SHOW_TOPN, default=9999): cv.positive_int,
vol.Optional(CONF_SHOW_TOPN, default=DEFAULT_TOPN): cv.positive_int,
vol.Optional(CONF_INCLUSIONS, default=[]): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_EXCLUSIONS, default=[]): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL): cv.time_period,
Expand Down Expand Up @@ -114,45 +117,79 @@ def update(self: FeedParserSensor) -> None:
if len(parsed_feed.entries) > self._show_topn
else len(parsed_feed.entries)
)
self._entries = []

for entry in parsed_feed.entries[: self._attr_state]:
entry_value = {}

for key, value in entry.items():
if (
(self._inclusions and key not in self._inclusions)
or ("parsed" in key)
or (key in self._exclusions)
):
continue
if key in ["published", "updated", "created", "expired"]:
time: datetime = parser.parse(value)
if self._local_time:
time = dt.as_local(time)
entry_value[key] = time.strftime(self._date_format)
else:
entry_value[key] = value

if "image" in self._inclusions and "image" not in entry_value.keys():
if "enclosures" in entry:
images = [
enc
for enc in entry["enclosures"]
if enc.type.startswith("image/")
]
else:
images = []
if images:
entry_value["image"] = images[0][
"href"
] # pick the first image found
else:
entry_value[
"image"
] = DEFAULT_THUMBNAIL # use default image if no image found

self._entries.append(entry_value)
self._entries.extend(self._generate_entries(parsed_feed))

def _generate_entries(
self: FeedParserSensor, parsed_feed: FeedParserDict
) -> list[dict[str, str]]:
return [
self._generate_sensor_entry(feed_entry)
for feed_entry in parsed_feed.entries[
: self.native_value # type: ignore[misc]
]
]

def _generate_sensor_entry(
self: FeedParserSensor, feed_entry: FeedParserDict
) -> dict[str, str]:
sensor_entry = {}
for key, value in feed_entry.items():
if (
(self._inclusions and key not in self._inclusions)
or ("parsed" in key)
or (key in self._exclusions)
):
continue
if key in ["published", "updated", "created", "expired"]:
parsed_date: datetime = self._parse_date(value)
sensor_entry[key] = parsed_date.strftime(self._date_format)
else:
sensor_entry[key] = value

self._process_image(feed_entry, sensor_entry)

return sensor_entry

def _parse_date(self: FeedParserSensor, date: str) -> datetime:
try:
parsed_time: datetime = email.utils.parsedate_to_datetime(date)
except ValueError:
_LOGGER.warning(
(
"Unable to parse RFC-822 date from %s. "
"This could be caused by incorrect pubDate format "
"in the RSS feed or due to a leapp second"
),
date,
)
parsed_time = parser.parse(date)
if not parsed_time.tzname():
# replace tzinfo with UTC offset if tzinfo does not contain a TZ name
parsed_time = parsed_time.replace(
tzinfo=timezone(parsed_time.utcoffset()) # type: ignore[arg-type]
)
if self._local_time:
parsed_time = dt.as_local(parsed_time)
return parsed_time

def _process_image(
self: FeedParserSensor, feed_entry: FeedParserDict, sensor_entry: dict[str, str]
) -> None:
if "image" in self._inclusions and "image" not in sensor_entry.keys():
if "enclosures" in feed_entry:
images = [
enc
for enc in feed_entry["enclosures"]
if enc.type.startswith("image/")
]
else:
images = []
if images:
sensor_entry["image"] = images[0]["href"] # pick the first image found
else:
sensor_entry[
"image"
] = DEFAULT_THUMBNAIL # use default image if no image found

@property
def extra_state_attributes(self: FeedParserSensor) -> dict[str, list]:
Expand Down