Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an MQTT Alert Handler #1345

Merged
merged 3 commits into from
Jul 24, 2017
Merged

Add an MQTT Alert Handler #1345

merged 3 commits into from
Jul 24, 2017

Conversation

timraymond
Copy link
Contributor

@timraymond timraymond commented Apr 27, 2017

Required for all non-trivial PRs
  • Rebased/mergable
  • Tests pass
  • CHANGELOG.md updated
  • Sign CLA (if not already signed)

Connect #1151

Required only if applicable

You can erase any checkboxes below this note if they are not applicable to your Pull Request.

Copy link
Contributor

@nathanielc nathanielc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@timraymond Thanks! I have made a few comments below, mostly about small things around name convention etc.

There are two larger changes:

  • The Update method of the MQTT service doesn't appear to be updating the live client connection and it needs to. The basic idea here is that Kapacitor allow configuration to be modified via the API at runtime, so the Update function is in charge of applying those updates and they need to take effect immediately.
  • The server/server_test.go will need tests added for your new service. How easy is it to mock out an MQTT broker? If its not easy lets discuss ways to test without it.

Thanks again this looks great. 🎉

@@ -350,6 +351,10 @@ type AlertNode struct {
// tick:ignore
TalkHandlers []*TalkHandler `tick:"Talk"`

// Send alert to MQTT
// tick:ignore
MQTTHandlers []*MQTTHandler `tick:"MQTT"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will read like this in TICKscript:

|alert()
    .mQTT()

Which is really odd so I think we should change this and the method name below to Mqtt so that in TICKscript it is all lower case.

@@ -942,6 +947,23 @@ func (a *AlertaHandler) Services(service ...string) *AlertaHandler {
return a
}

// Send alert to an MQTT broker
// tick:property
func (a *AlertNode) MQTT() *MQTTHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we accept the topic string here in this method? That way it reads like

|alert()
    .mqtt('topic goes here')

Since there is no other config for the MQTT handler this keeps it simple.

// Port of the MQTT Broker
Port uint16 `toml:"port" override:"port"`

ClientID string `toml:"client_id" override:"client_id"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kapacitor uses dashes instead of underscores. Can you update this one and the others in this config struct?

// Broker formats the configured Host and Port as tcp://host:port, suitable for
// consumption by the Paho MQTT Client
func (c Config) Broker() string {
return fmt.Sprintf("tcp://%s:%d", c.Host, c.Port)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the host is an IPv6 address with colon etc? I believe you can use the URL type here to safely create the Broker string without worrying about edge cases.

"github.com/influxdata/kapacitor/alert"
)

type QOSLevel byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is QOSLevel a Qualty of Service Level? If so should we name it QoSLevel?

const DEFAULT_QUIESCE_TIMEOUT time.Duration = 250 * time.Millisecond

type Service struct {
Logger *log.Logger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logger should be private.

if c, ok := newConfig[0].(Config); !ok {
return fmt.Errorf("expected config object to be of type %T, got %T", c, newConfig[0])
} else {
s.configValue.Store(c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do the changes to the config get propagated to the client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol! That would help, wouldn't it?


type HandlerConfig struct {
Topic string
QOS QOSLevel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again QoS is probably a better name here.

}

func (s *Service) Alert(qos QOSLevel, topic, message string) error {
s.client.Publish(topic, byte(qos), false, message) // should retained be configureable?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does retained do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch. I meant to research this some more. Publishers have no guarantee that messages will be received by a subscriber to a topic, since there might not be any clients listening at the time the message was published. In that case, the message is dropped by the broker. Retained (which is the 3rd param in Publish) sets a flag on the message that causes the broker to hold onto it and deliver it immediately to clients that subsequently subscribe to that topic. The broker holds onto at most one message per topic, so they're useful in describing state ("light switch is on"). Given this, I think the answer to my question is "yes," and it should probably be configureable from within the TICK script.

This entire series of posts has been invaluable: MQTT Essentials Part 8: Retained Messages

vendor.list Outdated
@@ -4,6 +4,7 @@ github.com/cenkalti/backoff
github.com/davecgh/go-spew
github.com/dgrijalva/jwt-go v3.0.0
github.com/dustin/go-humanize
github.com/eclipse/paho.mqtt.golang
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rebase your changes? This file is no longer in use in master. Kapacitor has switched over to using the new golang dep tool. Info on how to use it is in the CONTRIBUTING.md file.

@timraymond timraymond force-pushed the feature/tr-mqtt-alert branch from d24ecd1 to 943164c Compare May 1, 2017 21:56
@timraymond
Copy link
Contributor Author

@nathanielc okay, I think this is ready for another look (I see that I need to rebase again though). I moved Topic into the argument for mqtt() at your suggestion. However, I needed to add two more parameters for QoS and Retained, since the more I read, the more it made sense to control those on an alert by alert basis.

Regarding testing, I've come up with three options:

I read through the spec a bit, and I think I can put together a minimal broker that handles connections and published messages for a QoS of 0, which is what we'd need for a minimal test.

Alternatively, we could wrap the client in an interface and substitute a mock. It seems like that should be possible, but I'm not quite sure how to do the substitution with the existing test machinery that's in place. I think this option is the best, since we're not testing that the MQTT client is implemented correctly.

If we don't want to go that route, we could install Mosquitto in the test containers... but that requires everyone that works on Kapacitor to have a broker running if they want to run the tests, which is kind of 💩 .

Copy link
Contributor

@nathanielc nathanielc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few more smaller changes.

As for testing if you can mock the client that would be best. Just a package level test at service/mqtt would be enough to start.

Gopkg.toml Outdated
## what source location any dependent projects specify.
# source = "https://github.com/myfork/package.git"


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove these comments?


ClientID string `toml:"client-id" override:"client-id"`
Username string `toml:"username" override:"username"`
Password string `toml:"password" override:"password"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you update this override:"password" to override:"password,redact" then the config API will redact the password in all relevant API calls.


// QoSLevel indicates the quality of service for messages delivered to a
// broker.
type QoSLevel byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you implement the TextUnmarshaler interface on this type so its toml representation is not numeric?

// session.
opts.SetCleanSession(true)

s.client = pahomqtt.NewClient(opts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since s.client is being modified here you will need to lock access to it or use the atomic.Value type like you do for the config struct.

@timraymond timraymond force-pushed the feature/tr-mqtt-alert branch from a93cd44 to 4fbd33e Compare May 23, 2017 14:30
@timraymond
Copy link
Contributor Author

@nathanielc This should be ready for another look, if you get a chance

@nathanielc nathanielc force-pushed the feature/tr-mqtt-alert branch from 54f8ee0 to 8f46039 Compare July 21, 2017 15:41
@nathanielc
Copy link
Contributor

@timraymond I am not quite done (still tracking down one bug), but I have added a commit that makes it so you have multiple mqtt's configured. I am also adding tests which are not quite passing yet but I figured I push this up before I disappear for the weekend. Take a look when you have a sec, no rush.

Copy link
Contributor Author

@timraymond timraymond left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks awesome! I just had one question.

)

// Client describes an immutable MQTT client, designed to accommodate the
// incongruencies between real clients and mock clients.
type Client interface {
Connect() error
Disconnect()
Publish(string, byte, bool, string) error
Publish(topic string, qos QoSLevel, retained bool, message []byte) error
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize that you could pass in argument names in an interface declaration. Does that place any requirements on implementors or is it solely a documentation thing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just documents the arguments better. The implementation can still use whatever names it wants.

@nathanielc nathanielc force-pushed the feature/tr-mqtt-alert branch from d4fa1a5 to 335784f Compare July 24, 2017 15:58
timraymond and others added 2 commits July 24, 2017 10:36
Much of this code was pulled from the guide published here:
https://github.com/influxdata/kapacitor/blob/master/alert/HANDLERS.md .

This also adds eclipse/paho.mqtt.golang as a dependency to Kapacitor.
This client was chosen primarily because it is the same one that is used
in Telegraf. It is actively developed, and is widely used by other
popular projects in the community (e.g. Gobot).
@nathanielc nathanielc force-pushed the feature/tr-mqtt-alert branch from 335784f to 2ef9adf Compare July 24, 2017 16:36
@nathanielc
Copy link
Contributor

@timraymond All tests are passing and I have rebased and squashed commits. If it looks good to you then we can merge it.

@nathanielc
Copy link
Contributor

@timraymond Can you add a CHANGELOG entry before merging?

@timraymond timraymond changed the title WIP: Add an MQTT Alert Handler Add an MQTT Alert Handler Jul 24, 2017
@timraymond timraymond merged commit fc8f75d into master Jul 24, 2017
@timraymond timraymond deleted the feature/tr-mqtt-alert branch July 24, 2017 19:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants