Skip to content

Commit

Permalink
allow multiple mqtt clients, add tests for alert handling
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jul 24, 2017
1 parent 3f0525b commit 335784f
Show file tree
Hide file tree
Showing 17 changed files with 706 additions and 265 deletions.
10 changes: 5 additions & 5 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,11 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
}

for _, m := range n.MQTTHandlers {
c := et.tm.MQTTService.DefaultHandlerConfig()
if m.Topic != "" {
c.Topic = m.Topic
c.QoS = mqtt.QoSLevel(m.QoS)
c.Retained = m.Retained
c := mqtt.HandlerConfig{
BrokerName: m.BrokerName,
Topic: m.Topic,
QoS: mqtt.QoSLevel(m.Qos),
Retained: m.Retained,
}
h := et.tm.MQTTService.Handler(c, l)
an.handlers = append(an.handlers, h)
Expand Down
32 changes: 32 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,38 @@ default-retention-policy = ""
# The default authorName.
author_name = "Kapacitor"

# MQTT client configuration.
# Mutliple different clients may be configured by
# repeating [[mqtt]] sections.
[[mqtt]]
enabled = false
# Unique name for this broker configuration
name = "localhost"
# Whether this broker configuration is the default
default = true
# URL of the MQTT broker.
# Possible protocols include:
# tcp - Raw TCP network connection
# ssl - TLS protected TCP network connection
# ws - Websocket network connection
url = "tcp://localhost:1883"

# TLS/SSL configuration
# A CA can be provided without a key/cert pair
# ssl-ca = "/etc/kapacitor/ca.pem"
# Absolutes paths to pem encoded key and cert files.
# ssl-cert = "/etc/kapacitor/cert.pem"
# ssl-key = "/etc/kapacitor/key.pem"

# Unique ID for this MQTT client.
# If empty used the value of "name"
client-id = ""

# Username
username = ""
# Password
password = ""

##################################
# Input Methods, same as InfluxDB
#
Expand Down
18 changes: 15 additions & 3 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,17 +1017,29 @@ func (a *AlertNode) Mqtt(topic string) *MQTTHandler {
return m
}

// tick:embedded:AlertNode.Mqtt
type MQTTHandler struct {
*AlertNode

// BrokerName is the name of the configured MQTT broker to use when publishing the alert.
// If empty defaults to the configured default broker.
BrokerName string

// The topic where alerts will be dispatched to
Topic string

// The QoS that will be used to deliver the alerts
QoS int64
// The Qos that will be used to deliver the alerts
//
// Valid values are:
//
// * 0 - At most once delivery
// * 1 - At least once delivery
// * 2 - Exactly once delivery
//
Qos int64

// Retained indicates whether this alert should be delivered to
// clients that were not connected to the broker at the time of the alert
// clients that were not connected to the broker at the time of the alert.
Retained bool
}

Expand Down
4 changes: 2 additions & 2 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Config struct {
// Alert handlers
Alerta alerta.Config `toml:"alerta" override:"alerta"`
HipChat hipchat.Config `toml:"hipchat" override:"hipchat"`
MQTT mqtt.Config `toml:"mqtt" override:"mqtt"`
MQTT mqtt.Configs `toml:"mqtt" override:"mqtt,element-key=name"`
OpsGenie opsgenie.Config `toml:"opsgenie" override:"opsgenie"`
PagerDuty pagerduty.Config `toml:"pagerduty" override:"pagerduty"`
Pushover pushover.Config `toml:"pushover" override:"pushover"`
Expand Down Expand Up @@ -140,7 +140,7 @@ func NewConfig() *Config {

c.Alerta = alerta.NewConfig()
c.HipChat = hipchat.NewConfig()
c.MQTT = mqtt.NewConfig()
c.MQTT = mqtt.Configs{}
c.OpsGenie = opsgenie.NewConfig()
c.PagerDuty = pagerduty.NewConfig()
c.Pushover = pushover.NewConfig()
Expand Down
14 changes: 10 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ func New(c *Config, buildInfo BuildInfo, logService logging.Interface) (*Server,
// Append Alert integration services
s.appendAlertaService()
s.appendHipChatService()
s.appendMQTTService()
if err := s.appendMQTTService(); err != nil {
return nil, errors.Wrap(err, "mqtt service")
}
s.appendOpsGenieService()
s.appendPagerDutyService()
s.appendPushoverService()
Expand Down Expand Up @@ -457,16 +459,20 @@ func (s *Server) appendAuthService() {
s.AppendService("auth", srv)
}

func (s *Server) appendMQTTService() {
c := s.config.MQTT
func (s *Server) appendMQTTService() error {
cs := s.config.MQTT
l := s.LogService.NewLogger("[mqtt] ", log.LstdFlags)
srv := mqtt.NewService(c, l)
srv, err := mqtt.NewService(cs, l)
if err != nil {
return err
}

s.TaskMaster.MQTTService = srv
s.AlertService.MQTTService = srv

s.SetDynamicService("mqtt", srv)
s.AppendService("mqtt", srv)
return nil
}

func (s *Server) appendOpsGenieService() {
Expand Down
Loading

0 comments on commit 335784f

Please sign in to comment.