Skip to content

Commit

Permalink
Merge pull request #194 from wutaizeng/master
Browse files Browse the repository at this point in the history
Added Talk service for alerting.
  • Loading branch information
Nathaniel Cook committed Feb 5, 2016
2 parents 5f7dd1f + cb1dce3 commit a36be3d
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 1 deletion.
21 changes: 21 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleOpsGenie(&pipeline.OpsGenieHandler{}, ad) })
}

for _, talk := range n.TalkHandlers {
talk := talk
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleTalk(talk, ad) })
}

// Parse level expressions
an.levels = make([]*tick.StatefulExpr, CritAlert+1)
if n.Info != nil {
Expand Down Expand Up @@ -740,3 +745,19 @@ func (a *AlertNode) handleOpsGenie(og *pipeline.OpsGenieHandler, ad *AlertData)
return
}
}

func (a *AlertNode) handleTalk(talk *pipeline.TalkHandler, ad *AlertData) {
if a.et.tm.TalkService == nil {
a.logger.Println("E! failed to send Talk message. Talk is not enabled")
return
}

err := a.et.tm.TalkService.Alert(
ad.ID,
ad.Message,
)
if err != nil {
a.logger.Println("E! failed to send alert data to Talk:", err)
return
}
}
7 changes: 7 additions & 0 deletions cmd/kapacitord/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/influxdata/kapacitor/services/slack"
"github.com/influxdata/kapacitor/services/smtp"
"github.com/influxdata/kapacitor/services/stats"
"github.com/influxdata/kapacitor/services/talk"
"github.com/influxdata/kapacitor/services/task_store"
"github.com/influxdata/kapacitor/services/udf"
"github.com/influxdata/kapacitor/services/udp"
Expand Down Expand Up @@ -58,6 +59,7 @@ type Config struct {
Stats stats.Config `toml:"stats"`
UDF udf.Config `toml:"udf"`
Deadman deadman.Config `toml:"deadman"`
Talk talk.Config `toml:"talk"`

Hostname string `toml:"hostname"`
DataDir string `toml:"data_dir"`
Expand Down Expand Up @@ -89,6 +91,7 @@ func NewConfig() *Config {
c.Stats = stats.NewConfig()
c.UDF = udf.NewConfig()
c.Deadman = deadman.NewConfig()
c.Talk = talk.NewConfig()

return c
}
Expand Down Expand Up @@ -143,6 +146,10 @@ func (c *Config) Validate() error {
if err != nil {
return err
}
err = c.Talk.Validate()
if err != nil {
return err
}
for _, g := range c.Graphites {
if err := g.Validate(); err != nil {
return fmt.Errorf("invalid graphite config: %v", err)
Expand Down
12 changes: 12 additions & 0 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/influxdata/kapacitor/services/slack"
"github.com/influxdata/kapacitor/services/smtp"
"github.com/influxdata/kapacitor/services/stats"
"github.com/influxdata/kapacitor/services/talk"
"github.com/influxdata/kapacitor/services/task_store"
"github.com/influxdata/kapacitor/services/udf"
"github.com/influxdata/kapacitor/services/udp"
Expand Down Expand Up @@ -145,6 +146,7 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*
s.appendAlertaService(c.Alerta)
s.appendSlackService(c.Slack)
s.appendSensuService(c.Sensu)
s.appendTalkService(c.Talk)

// Append InfluxDB services
s.appendCollectdService(c.Collectd)
Expand Down Expand Up @@ -388,6 +390,16 @@ func (s *Server) appendReportingService(c reporting.Config) {
}
}

func (s *Server) appendTalkService(c talk.Config) {
if c.Enabled {
l := s.LogService.NewLogger("[talk] ", log.LstdFlags)
srv := talk.NewService(c, l)
s.TaskMaster.TalkService = srv

s.Services = append(s.Services, srv)
}
}

// Err returns an error channel that multiplexes all out of band errors received from all services.
func (s *Server) Err() <-chan error { return s.err }

Expand Down
7 changes: 7 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,13 @@ data_dir = "/var/lib/kapacitor"
# [udf.functions.pyavg.env]
# PYTHONPATH = "./udf/agent/py"

[talk]
# Configure Talk.
enabled = false
# The Talk webhook URL.
url = "https://jianliao.com/v2/services/webhook/uuid"
# The default authorName.
author_name = "Kapacitor"

##################################
# Input Methods, same as InfluxDB
Expand Down
65 changes: 65 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/influxdata/kapacitor/services/pagerduty"
"github.com/influxdata/kapacitor/services/sensu"
"github.com/influxdata/kapacitor/services/slack"
"github.com/influxdata/kapacitor/services/talk"
"github.com/influxdata/kapacitor/services/victorops"
"github.com/influxdata/kapacitor/udf"
"github.com/influxdata/kapacitor/wlog"
Expand Down Expand Up @@ -1946,6 +1947,70 @@ stream
}
}

func TestStream_AlertTalk(t *testing.T) {
requestCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
type postData struct {
Title string `json:"title"`
Text string `json:"text"`
AuthorName string `json:"authorName"`
}
pd := postData{}
dec := json.NewDecoder(r.Body)
dec.Decode(&pd)

if exp := "Kapacitor"; pd.AuthorName != exp {
t.Errorf("unexpected source got %s exp %s", pd.AuthorName, exp)
}

if exp := "kapacitor/cpu/serverA is CRITICAL"; pd.Text != exp {
t.Errorf("unexpected text got %s exp %s", pd.Text, exp)
}

if exp := "kapacitor/cpu/serverA"; pd.Title != exp {
t.Errorf("unexpected text got %s exp %s", pd.Title, exp)
}

}))
defer ts.Close()

var script = `
stream
.from().measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
.window()
.period(10s)
.every(10s)
.mapReduce(influxql.count('value'))
.alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.info(lambda: "count" > 6.0)
.warn(lambda: "count" > 7.0)
.crit(lambda: "count" > 8.0)
.talk()
`

clock, et, replayErr, tm := testStreamer(t, "TestStream_Alert", script, nil)
defer tm.Close()

c := talk.NewConfig()
c.URL = ts.URL
c.AuthorName = "Kapacitor"
sl := talk.NewService(c, logService.NewLogger("[test_talk] ", log.LstdFlags))
tm.TalkService = sl

err := fastForwardTask(clock, et, replayErr, tm, 13*time.Second)
if err != nil {
t.Error(err)
}

if requestCount != 1 {
t.Errorf("unexpected requestCount got %d exp 1", requestCount)
}
}

func TestStream_AlertSigma(t *testing.T) {
requestCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
45 changes: 44 additions & 1 deletion pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const defaultDetailsTmpl = "{{ json . }}"
// See AlertNode.Info, AlertNode.Warn, and AlertNode.Crit below.
//
// Different event handlers can be configured for each AlertNode.
// Some handlers like Email, HipChat, Sensu, Slack, OpsGenie, VictorOps and PagerDuty have a configuration
// Some handlers like Email, HipChat, Sensu, Slack, OpsGenie, VictorOps, PagerDuty and Talk have a configuration
// option 'global' that indicates that all alerts implicitly use the handler.
//
// Available event handlers:
Expand All @@ -38,6 +38,7 @@ const defaultDetailsTmpl = "{{ json . }}"
// * OpsGenie -- Send alert to OpsGenie.
// * VictorOps -- Send alert to VictorOps.
// * PagerDuty -- Send alert to PagerDuty.
// * Talk -- Post alert message to Talk client.
//
// See below for more details on configuring each handler.
//
Expand Down Expand Up @@ -241,6 +242,10 @@ type AlertNode struct {
// Send alert to OpsGenie
// tick:ignore
OpsGenieHandlers []*OpsGenieHandler

// Send alert to Talk.
// tick:ignore
TalkHandlers []*TalkHandler
}

func newAlertNode(wants EdgeType) *AlertNode {
Expand Down Expand Up @@ -863,3 +868,41 @@ func (og *OpsGenieHandler) Recipients(recipients ...string) *OpsGenieHandler {
og.RecipientsList = recipients
return og
}

// Send the alert to Talk.
// To use Talk alerting you must first follow the steps to create a new incoming webhook.
//
// 1. Go to the URL https:/account.jianliao.com/signin.
// 2. Sign in with you account. under the Team tab, click "Integrations".
// 3. Select "Customize service", click incoming Webhook "Add" button.
// 4. After choose the topic to connect with "xxx", click "Confirm Add" button.
// 5. Once the service is created, you'll see the "Generate Webhook url".
//
// Place the 'Generate Webhook url' into the 'Talk' section of the Kapacitor configuration as the option 'url'.
//
// Example:
// [talk]
// enabled = true
// url = "https://jianliao.com/v2/services/webhook/uuid"
// author_name = "Kapacitor"
//
// Example:
// stream...
// .alert()
// .talk()
//
// Send alerts to Talk client.
//
// tick:property
func (a *AlertNode) Talk() *TalkHandler {
talk := &TalkHandler{
AlertNode: a,
}
a.TalkHandlers = append(a.TalkHandlers, talk)
return talk
}

// tick:embedded:AlertNode.Talk
type TalkHandler struct {
*AlertNode
}
26 changes: 26 additions & 0 deletions services/talk/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package talk

import (
"net/url"
)

type Config struct {
// Whether Talk integration is enabled.
Enabled bool `toml:"enabled"`
// The Talk webhook URL, can be obtained by adding Incoming Webhook integration.
URL string `toml:"url"`
// The default authorName, can be overriden per alert.
AuthorName string `toml:"author_name"`
}

func NewConfig() Config {
return Config{}
}

func (c Config) Validate() error {
_, err := url.Parse(c.URL)
if err != nil {
return err
}
return nil
}
61 changes: 61 additions & 0 deletions services/talk/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package talk

import (
"bytes"
"encoding/json"
"errors"
"log"
"net/http"
)

type Service struct {
url string
authorName string
logger *log.Logger
}

func NewService(c Config, l *log.Logger) *Service {
return &Service{
url: c.URL,
authorName: c.AuthorName,
logger: l,
}
}

func (s *Service) Open() error {
return nil
}

func (s *Service) Close() error {
return nil
}

func (s *Service) Alert(title, text string) error {
postData := make(map[string]interface{})
postData["title"] = title
postData["text"] = text
postData["authorName"] = s.authorName

var post bytes.Buffer
enc := json.NewEncoder(&post)
err := enc.Encode(postData)
if err != nil {
return err
}

resp, err := http.Post(s.url, "application/json", &post)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
type response struct {
Error string `json:"error"`
}
r := &response{Error: "failed to understand Talk response"}
dec := json.NewDecoder(resp.Body)
dec.Decode(r)
return errors.New(r.Error)
}
return nil
}
4 changes: 4 additions & 0 deletions task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type TaskMaster struct {
SensuService interface {
Alert(name, output string, level AlertLevel) error
}
TalkService interface {
Alert(title, text string) error
}
LogService LogService

// Incoming streams
Expand Down Expand Up @@ -130,6 +133,7 @@ func (tm *TaskMaster) New() *TaskMaster {
n.HipChatService = tm.HipChatService
n.AlertaService = tm.AlertaService
n.SensuService = tm.SensuService
n.TalkService = tm.TalkService
return n
}

Expand Down

0 comments on commit a36be3d

Please sign in to comment.