-
Notifications
You must be signed in to change notification settings - Fork 490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an MQTT Alert Handler #1345
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@timraymond Thanks! I have made a few comments below, mostly about small things around name convention etc.
There are two larger changes:
- The Update method of the MQTT service doesn't appear to be updating the live client connection and it needs to. The basic idea here is that Kapacitor allow configuration to be modified via the API at runtime, so the Update function is in charge of applying those updates and they need to take effect immediately.
- The server/server_test.go will need tests added for your new service. How easy is it to mock out an MQTT broker? If its not easy lets discuss ways to test without it.
Thanks again this looks great. 🎉
pipeline/alert.go
Outdated
@@ -350,6 +351,10 @@ type AlertNode struct { | |||
// tick:ignore | |||
TalkHandlers []*TalkHandler `tick:"Talk"` | |||
|
|||
// Send alert to MQTT | |||
// tick:ignore | |||
MQTTHandlers []*MQTTHandler `tick:"MQTT"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will read like this in TICKscript:
|alert()
.mQTT()
Which is really odd so I think we should change this and the method name below to Mqtt
so that in TICKscript it is all lower case.
pipeline/alert.go
Outdated
@@ -942,6 +947,23 @@ func (a *AlertaHandler) Services(service ...string) *AlertaHandler { | |||
return a | |||
} | |||
|
|||
// Send alert to an MQTT broker | |||
// tick:property | |||
func (a *AlertNode) MQTT() *MQTTHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we accept the topic string here in this method? That way it reads like
|alert()
.mqtt('topic goes here')
Since there is no other config for the MQTT handler this keeps it simple.
services/mqtt/config.go
Outdated
// Port of the MQTT Broker | ||
Port uint16 `toml:"port" override:"port"` | ||
|
||
ClientID string `toml:"client_id" override:"client_id"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kapacitor uses dashes instead of underscores. Can you update this one and the others in this config struct?
services/mqtt/config.go
Outdated
// Broker formats the configured Host and Port as tcp://host:port, suitable for | ||
// consumption by the Paho MQTT Client | ||
func (c Config) Broker() string { | ||
return fmt.Sprintf("tcp://%s:%d", c.Host, c.Port) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the host is an IPv6 address with colon etc? I believe you can use the URL type here to safely create the Broker string without worrying about edge cases.
services/mqtt/service.go
Outdated
"github.com/influxdata/kapacitor/alert" | ||
) | ||
|
||
type QOSLevel byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is QOSLevel a Qualty of Service Level? If so should we name it QoSLevel?
services/mqtt/service.go
Outdated
const DEFAULT_QUIESCE_TIMEOUT time.Duration = 250 * time.Millisecond | ||
|
||
type Service struct { | ||
Logger *log.Logger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logger should be private.
services/mqtt/service.go
Outdated
if c, ok := newConfig[0].(Config); !ok { | ||
return fmt.Errorf("expected config object to be of type %T, got %T", c, newConfig[0]) | ||
} else { | ||
s.configValue.Store(c) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do the changes to the config get propagated to the client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol! That would help, wouldn't it?
services/mqtt/service.go
Outdated
|
||
type HandlerConfig struct { | ||
Topic string | ||
QOS QOSLevel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again QoS is probably a better name here.
services/mqtt/service.go
Outdated
} | ||
|
||
func (s *Service) Alert(qos QOSLevel, topic, message string) error { | ||
s.client.Publish(topic, byte(qos), false, message) // should retained be configureable? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does retained do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good catch. I meant to research this some more. Publishers have no guarantee that messages will be received by a subscriber to a topic, since there might not be any clients listening at the time the message was published. In that case, the message is dropped by the broker. Retained (which is the 3rd param in Publish
) sets a flag on the message that causes the broker to hold onto it and deliver it immediately to clients that subsequently subscribe to that topic. The broker holds onto at most one message per topic, so they're useful in describing state ("light switch is on"). Given this, I think the answer to my question is "yes," and it should probably be configureable from within the TICK script.
This entire series of posts has been invaluable: MQTT Essentials Part 8: Retained Messages
vendor.list
Outdated
@@ -4,6 +4,7 @@ github.com/cenkalti/backoff | |||
github.com/davecgh/go-spew | |||
github.com/dgrijalva/jwt-go v3.0.0 | |||
github.com/dustin/go-humanize | |||
github.com/eclipse/paho.mqtt.golang |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you rebase your changes? This file is no longer in use in master. Kapacitor has switched over to using the new golang dep tool. Info on how to use it is in the CONTRIBUTING.md file.
d24ecd1
to
943164c
Compare
@nathanielc okay, I think this is ready for another look (I see that I need to rebase again though). I moved Regarding testing, I've come up with three options: I read through the spec a bit, and I think I can put together a minimal broker that handles connections and published messages for a QoS of 0, which is what we'd need for a minimal test. Alternatively, we could wrap the client in an interface and substitute a mock. It seems like that should be possible, but I'm not quite sure how to do the substitution with the existing test machinery that's in place. I think this option is the best, since we're not testing that the MQTT client is implemented correctly. If we don't want to go that route, we could install Mosquitto in the test containers... but that requires everyone that works on Kapacitor to have a broker running if they want to run the tests, which is kind of 💩 . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few more smaller changes.
As for testing if you can mock the client that would be best. Just a package level test at service/mqtt would be enough to start.
Gopkg.toml
Outdated
## what source location any dependent projects specify. | ||
# source = "https://github.com/myfork/package.git" | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove these comments?
services/mqtt/config.go
Outdated
|
||
ClientID string `toml:"client-id" override:"client-id"` | ||
Username string `toml:"username" override:"username"` | ||
Password string `toml:"password" override:"password"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you update this override:"password"
to override:"password,redact"
then the config API will redact the password in all relevant API calls.
|
||
// QoSLevel indicates the quality of service for messages delivered to a | ||
// broker. | ||
type QoSLevel byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you implement the TextUnmarshaler interface on this type so its toml representation is not numeric?
services/mqtt/service.go
Outdated
// session. | ||
opts.SetCleanSession(true) | ||
|
||
s.client = pahomqtt.NewClient(opts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since s.client
is being modified here you will need to lock access to it or use the atomic.Value type like you do for the config struct.
a93cd44
to
4fbd33e
Compare
@nathanielc This should be ready for another look, if you get a chance |
54f8ee0
to
8f46039
Compare
@timraymond I am not quite done (still tracking down one bug), but I have added a commit that makes it so you have multiple mqtt's configured. I am also adding tests which are not quite passing yet but I figured I push this up before I disappear for the weekend. Take a look when you have a sec, no rush. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks awesome! I just had one question.
services/mqtt/client.go
Outdated
) | ||
|
||
// Client describes an immutable MQTT client, designed to accommodate the | ||
// incongruencies between real clients and mock clients. | ||
type Client interface { | ||
Connect() error | ||
Disconnect() | ||
Publish(string, byte, bool, string) error | ||
Publish(topic string, qos QoSLevel, retained bool, message []byte) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't realize that you could pass in argument names in an interface
declaration. Does that place any requirements on implementors or is it solely a documentation thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just documents the arguments better. The implementation can still use whatever names it wants.
d4fa1a5
to
335784f
Compare
Much of this code was pulled from the guide published here: https://github.com/influxdata/kapacitor/blob/master/alert/HANDLERS.md . This also adds eclipse/paho.mqtt.golang as a dependency to Kapacitor. This client was chosen primarily because it is the same one that is used in Telegraf. It is actively developed, and is widely used by other popular projects in the community (e.g. Gobot).
335784f
to
2ef9adf
Compare
@timraymond All tests are passing and I have rebased and squashed commits. If it looks good to you then we can merge it. |
@timraymond Can you add a CHANGELOG entry before merging? |
Required for all non-trivial PRs
Connect #1151
Required only if applicable
You can erase any checkboxes below this note if they are not applicable to your Pull Request.