-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Receiving and processing MQTT messages from Frigate NVR #632
Comments
Dear sev, thanks for writing in. I am not 100% sure, but I think the value of the You may want to play around with this command line incantation to see if "title" and "message" will get propagated properly. mqttwarn --plugin=apprise \
--config='{"baseuri": "ntfy://user:[email protected]/topic1/topic2"}' \
--options='{"addrs": [], "title": "Example notification", "message": "Hello world"}' I do understand you correctly that putting something into the message body would fit your needs, right? Otherwise, please let me know if you are intentionally looking into putting something into the message's attachment instead. This may require a different approach, but is well worth to explore. With kind regards, |
Sorry, I was speaking in reference to the Ntfy API. You can upload attachments via PUT request body. With Apprise, I think the only way to send an attachment is via URI, so you can disregard that. I really thought that there was another way to do it with Apprise, but I may have been mistaken. For the remaining issue, filtering does not appear to work, so the unmodified JSON payload is displayed for non-matching messages, unless I am just using it wrong. |
Hi again, it should work to submit attachments via mqttwarn => Apprise => Ntfy. However, we have not tested it, so it will be sweet to explore this detail together. The "Parameter Breakdown" section of the corresponding documentation of Apprise's Ntfy plugin tells us:
-- https://github.com/caronc/apprise/wiki/Notify_ntfy#parameter-breakdown So, maybe you are already successful with a dry-run snippet like this, using mqttwarn --plugin=apprise \
--config='{"baseuri": "ntfy://user:[email protected]/topic1/topic2?attach=https://httpbin.org/ip"}' \
--options='{"addrs": [], "title": "Example notification", "message": "Hello world"}' If you can validate that any of those options to use the With kind regards, |
Ah, you actually don't want to forward an external resource as attachment, but use the return value of what you produce in Or do you want to use |
The image is sent in another MQTT payload, so it first needs to be ingressed by the I am confident I can get that to work, and I have tested attachments via URI before (only with external resources fetched with HTTP however) so my main concern now is why the filter function isn't working, unless you believe there is some work that could be done to streamline the attachment process that should be focused on. |
Oh I see. So you will have to introduce a kind of stateful processing for that? Will be fun!
I also think it is not possible. Maybe @caronc has different things to say about this?
Yeah, it will probably need some trial-and-error. Please let me know if you find any blockers.
My thoughts would have been around how to properly propagate an "attachment" parameter from the inbound MQTT message payload or topic into the transformation dict and then into the outbound parameters to the Apprise/Ntfy plugin. But after learning you want to submit the whole inbound message itself as an attachment, I don't think this was actually your need, right?
All right, let's focus on this detail now. |
We do indeed have a problem with the first part that I was hoping would be easy :) #634 The plan was to save the snapshot to file and provide a proxied URL (or |
I am just now getting a bit more into the details while I am looking into the snippets you provided. I will add them to the
That's sad. I was hoping that Frigate would provide an URL to the snapshot file within the "event" message already. But I see that Frigate itself has no capability to store those snapshot files anywhere, so it has no other chance than just submitting their payloads per MQTT, right? 1 We had two other discussions about submitting images in the past, but I think your scenario is new.
Don't fret. Unless nothing serious will block this endeavour, I think we can make it work. Footnotes
|
While getting the code snippet to my machine, my editor's code highlighting immediately reveals why it is probably not working: Line 34 will always assign |
Sure. I will just add it to a branch first, so that you can use and test mqttwarn from there, including any improvements we will make while we go.
Sure!
I wouldn't advise on using S3 at AWS, if this is what your concerns would be about. I was looking at MinIO here instead, which offers an S3-compatible interface. But if there is no MQTT message which will point out the URL, correlated with the event you are looking at, this is pointless. Those are some corresponding items on the issue tracker of Frigate. It looks like all of them stalled, I am just referencing them for the sake of completeness, before moving on.
|
Yep.
I think the path can be constructed based on the payload. Frigate saves snapshots (previously clips) to |
That would be excellent, so mqttwarn would not need to process the image at all? So, the "after": {
"id": "1680791459.255384-abcdef",
"camera": "camera",
} In this way, it would be a matter to find out how those parameters could be interpolated into the configured Apprise URL, right? |
I've just added the code and configuration snippets you provided with GH-635. When running the example as outlined at README » Usage, I am getting a huge error output. Those are the first few lines with significant content. 2023-04-12 19:26:08,185 DEBUG [mqttwarn.core ] Message received on frigate/events: {
2023-04-12 19:26:08,185 DEBUG [mqttwarn.core ] Section [frigate/events] matches message on frigate/events, processing it
2023-04-12 19:26:08,185 DEBUG [mqttwarn.context ] filter ******************************************
2023-04-12 19:26:08,186 WARNING [mqttwarn.context ] Cannot invoke filter function 'frigate_events_filter' defined in 'frigate/events': Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
2023-04-12 19:26:08,186 DEBUG [mqttwarn.context ] events ******************************************
2023-04-12 19:26:08,186 WARNING [mqttwarn.context ] Cannot invoke alldata function 'frigate_events' defined in 'frigate/events': Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
2023-04-12 19:26:08,186 DEBUG [mqttwarn.core ] Cannot decode JSON object, payload={: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
[...]
2023-04-12 19:18:30,233 ERROR [mqttwarn.core ] Formatting message with function '{title}' failed
Traceback (most recent call last):
File "/path/to/mqttwarn/mqttwarn/core.py", line 357, in xform
res = Formatter().format(function, **transform_data)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[...]
KeyError: 'title' |
Oh I see. Such errors are apparently still hard to debug. It will be easier when tweaking diff --git a/mqttwarn/context.py b/mqttwarn/context.py
index 1c1508e..ff1fa7f 100644
--- a/mqttwarn/context.py
+++ b/mqttwarn/context.py
@@ -62,6 +62,7 @@ class RuntimeContext:
try:
return self.invoker.filter(filterfunc, topic, payload, section)
except Exception as e:
+ raise
logger.warning("Cannot invoke filter function '%s' defined in '%s': %s" % (filterfunc, section, e))
return False It gives us the following output then. 2023-04-12 19:32:03,074 DEBUG [mqttwarn.core ] Message received on frigate/events: {
2023-04-12 19:32:03,075 DEBUG [mqttwarn.core ] Section [frigate/events] matches message on frigate/events, processing it
2023-04-12 19:32:03,075 DEBUG [mqttwarn.context ] filter ******************************************
Traceback (most recent call last):
File "/path/to/mqttwarn/mqttwarn/core.py", line 643, in subscribe_forever
mqttc.loop_forever()
[...]
File "/path/to/mqttwarn/mqttwarn/context.py", line 63, in is_filtered
return self.invoker.filter(filterfunc, topic, payload, section)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/path/to/mqttwarn/mqttwarn/context.py", line 204, in filter
rc = func(topic, payload, section, self.srv) # new version
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "frigate.py", line 32, in frigate_events_filter
[...]
File "/path/to/mqttwarn/.venv311/lib/python3.11/site-packages/paho/mqtt/client.py", line 3570, in _handle_on_message
on_message(self, self._userdata, message)
File "/path/to/mqttwarn/mqttwarn/core.py", line 183, in on_message
if context.is_filtered(section, topic, payload):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/path/to/mqttwarn/mqttwarn/context.py", line 63, in is_filtered
return self.invoker.filter(filterfunc, topic, payload, section)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/path/to/mqttwarn/mqttwarn/context.py", line 204, in filter
rc = func(topic, payload, section, self.srv) # new version
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "frigate.py", line 32, in frigate_events_filter
message = json.loads(message)
^^^^^^^^^^^^^^^^^^^ So, apparently, assuming |
Oh, nevermind, it's my fault. Still, it reveals a spot where developing custom mqttwarn logic is painful, because the root cause is hidden by suppressing the corresponding full stack trace. We should think about improving this situation. |
Yes, I also ran into this kind of issue when I was attempting to write the custom functionality. I had to enable the very verbose and hard to read debug messages to even get an inkling of what was wrong, and had to run my code in a standalone interpreter to catch syntax errors. I also feel the documentation for writing Python functions could be improved as I found it hard to read and navigate, though that's another issue. I don't currently get any errors from running the |
That is correct. But if the image is not written synchronously, it could be possible the image does not exist when the MQTT event is sent. I have not looked into this yet. As long as the image is written before the MQTT event is, there should be no problem approaching it in this way. However, this does require that mqttwarn and Frigate be on the same system and/or mqttwarn have immediate access to Frigate's snapshots. Alternatively, the image can be hosted on Frigate behind a proxy, and a URL to that can be provided to Ntfy. Ntfy will download the image and store it in its attachment cache. This is less secure as there can be no authentication for the URL for Ntfy to download it. You could make the file path to contain a shared secret or algorithm between Frigate's reverse proxy and mqttwarn to help mitigate this. Or, you could block unexpected requests at the firewall level. |
I've fixed the example with de00c85. Now, it seems to process the event perfectly well.
Can you specifically outline what does not work for you? Should the |
You can do any of the following:
|
Regrettably, I don't think this will work. Frigate only writes thumbnail to disk after the detection has ended. So I will have to use the initial solution of relying on the I glanced at Frigate to try and figure out how they publish MQTT events, but I have so far not really been able to demystify it. The |
Thanks. With f35ccd1, I've added corresponding minified example messages, and adjusted the The implementation became slightly more verbose compared to what you have been attempting with the list comprehension, but I think it is a good thing, in order to provide better log messages to the dear user, which - just in case - is nothing less as an audit log. In this particular use case, it may become important to exactly know why your event notification has been ignored or skipped. |
Thanks for investigating. So be it. I will add corresponding code to the |
I saw in your latest commits with the docstring on the filter function, that Oops. |
May I ask in which order the messages are published to the broker? Does the event message come first, and the image snapshot message afterwards? |
That's what I was trying to figure out, I don't know yet. Based on what I am reading the snapshot is published when a detection moves to a true positive state, and from every event there on if the snapshot is considered better than the previous one. I am not sure which gets published first, I can observe to see. Here's my new filter function.
|
Oh, it's really easy to find out. Just subscribe to the MQTT broker on all topics. mosquitto_sub -v -h localhost -t '#' |
Right, I just haven't done it yet :) |
Thanks for sharing. I will add it to the feature branch. However, currently it skips both I really can't decipher the logic from the code, is your comment »only process messages that match schema and have zone changes« a thorough description about what you are trying to achieve? I think the first part is fine if (message.get('type', None) != 'end'
and (a := message.get('after', None))
and a.get('current_zones', None) != a.get('entered_zones', None)): but please let's provide a more "unfolded" variant of the second part to readers of the tutorial I am using your use-case for. x in a and (x == 'current_zones' or bool(a[x])) for x in
('false_positive', 'camera', 'label',
'current_zones', 'entered_zones', 'frame_time')) |
This code will always return try:
message = json.loads(message)
except:
pass
else:
# can't parse the message
return True This code will be better, but still it will not inform the user by emitting a corresponding log message. try:
message = json.loads(message)
except:
# can't parse the message
return True However, after fixing this, "good" messages will still be skipped. I've validated that another flaw must be in the list comprehension comparison function. Please unfold it ;]. |
I am doing a few things at once so my code could have been better. Here is it commentated and with your suggestions. Both this and my previous post have been untested (hence why it filtered try:
message = json.loads(message)
except ValueError as e:
srv.logging.warning(f"Can't parse Frigate event message: {e}")
return True
# check the message type
t = message.get('type', None)
if t is None:
srv.logging.warning('Frigate event: Missing message type')
return True
# ignore ending messages
elif t == 'end':
return True
# payload must have 'after' key
elif (a := message.get('after', None)) is None:
srv.logging.warning("Frigate event: 'after' missing from payload")
return True
# validate the 'after' dict contains the values we need
for x in ('false_positive', 'camera', 'label',
'current_zones', 'entered_zones', 'frame_time'):
# we can ignore if current_zones is empty, but all other keys should
# be present and have values
if x not in a or (x != 'current_zones' and not a[x]):
srv.logging.warning(f"Frigate event: 'after.{x}' missing from payload or empty")
return True
# if this is an update message, ignore it if zones haven't changed
# updates can happen for other reasons that we don't need to notify
if t == 'update' and a['current_zones'] == a['entered_zones']:
return True
return False This should have the same functionality as if (message.get('type', None) != 'end'
and (a := message.get('after', None))
and (messages.get('type', None) == 'new'
or a.get('current_zones', None) != a.get('entered_zones', None)):
return False in (
x in a and (x == 'current_zones' or bool(a[x])) for x in
('false_positive', 'camera', 'label',
'current_zones', 'entered_zones', 'frame_time')) When I get home later tonight I will test all of this to make sure it does what I think it should, but I am fairly confident now. |
You are not actually running the code? This makes sense now ;]. The code will not work because Let's please serialize validation rule evaluation, handle each field type individually, and provide good error messaging to the user and developer, so please build upon 58f716d when sending further code snippets 1. The implementation now succeeds to let Footnotes
|
c258840 tries to improve the situation in this regard. |
Hi again, I've unlocked receiving and processing MQTT messages with binary payloads, see #634 (comment), and also unlocked configuring a It may need some more adjustments, but it seems like the parameter propagation works up to the point where it actually notifies Apprise/Ntfy. There, it fails because I don't have a Ntfy daemon configured on my machine. I will certainly do, in order to complete the README accordingly, and maybe add a dedicated test case for the whole scenario, but maybe I can't find more time for that, so you may also step in to test the current implementation end-to-end? With kind regards, |
Wow, you've gone much farther with this than I ever considered. I was perfectly content with my rinky-dink function (that I neglected to test and was filled with issues...), but this implementation is much more solid and effective. I have nothing to add to your class structure or control flow, it all looks good. I left some feedback on the PR. You have also reminded me that a second argument of |
Hi again, excellent that it works for you. We will bring in a few more improvements with GH-635, as discussed. Independently of that, while we are currently enjoying a good collaboration, I would like to ask for your feedback about the documentation modernizations happening on behalf of GH-389 and GH-636, see https://mqttwarn.readthedocs.io/. With kind regards, |
Hi again, I've improved the corresponding implementations needed to support your use case, and I will be happy to receive any kind of feedback. With kind regards, References |
Dear @sevmonster, GH-638 has been merged, and now there is also much progress with GH-639. Specifically, I've just added 2433cd4, which synchronizes JSON event and snapshot image receive order, so that things can be controlled precisely. Together with a few other improvements, for example that the configuration of text message templates has been pulled into the configuration file, the configuration now looks like this 1: [config:ntfy]
targets = {
'test': {
'url': 'http://username:password@localhost:5555/frigate-testdrive',
'file': '/tmp/mqttwarn-frigate-{camera}-{label}.png',
'click': 'https://httpbin.org/anything?camera={event.camera}&label={event.label}&zone={event.entered_zones[0]}',
# Wait for the file to arrive for three quarters of a second, and delete it after reading.
'__settings__': {
'file_retry_tries': 10,
'file_retry_interval': 0.075,
'file_unlink': True,
}
}
}
[frigate/events]
filter = frigate_events_filter()
alldata = frigate_events()
targets = ntfy:test
title = {event.label} entered {event.entered_zones_str} at {event.time}
format = {event.label} was in {event.current_zones_str} before
# Limit the alert based on camera/zone.
frigate_skip_rules = {
'rule-1': {'camera': ['frontyard'], 'entered_zones': ['lawn']},
} With kind regards, Footnotes
|
mqttwarn 0.34.0 has been released, Frigate » Forward events and snapshots to ntfy has the corresponding documentation. I will be happy to hear back from you, about if you can validate it also works well on your end, or if we would need to add a few more adjustments. When everything works well, we could add corresponding stubs to the integration pages on both the Frigate, and the ntfy documentation 12. Footnotes |
In order to shed more light onto the current implementation, and to set the stage for subsequent actions on it, please consider those two fragments of the mqttwarn configuration file for the Frigate integration scenario, frigate.ini. I was expecting that there would be the need to introduce concurrency and synchronization into the user-defined functions file frigate.py, in order to properly handle the two distinct MQTT messages emitted by Frigate, and process them properly, but it turned out that mqttwarn has all the machinery in place to make that happen without further ado. Specifically, it is the The other detail is the Footnotes
|
Sorry, I have been away from this for a while. I will get read up on all the amazing work you all have been doing. From what I am seeing there is now a ntfy notifier which can send directly to ntfy server—that is amazing and I can't wait to try it out. |
I set up a Frigate -> Mosquitto -> mqttwarn -> Apprise -> Ntfy -> Nginx flow (whew) and everything is working... Almost. The filter seems to not be blocking bad messages even though it should be based on my debugging. And, I can't figure out how to send attachments. My only lead is to save the attachment to a local file before using the
attach
query parameter in my Ntfy URI, but it would be nice to be able to send it through the body of the message instead, which I believe Apprise supports.Depending on if any of this not working is a bug or potential feature request, this issue could turn into an actual issue and not a support request :)
mqttwarn.ini
funcs.py
Sample payload
The text was updated successfully, but these errors were encountered: