From 95c591be6e78d26b86d5d0ea06dc83650ae44570 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Fri, 6 Jan 2017 10:09:24 -0700 Subject: [PATCH] initial rework for the new alert system --- alert.go | 387 +----------------- integrations/streamer_test.go | 276 +++++++------ pipeline/alert.go | 28 +- server/config.go | 10 +- server/server.go | 10 +- services/alert/service.go | 15 + services/snmptrap/config.go | 29 +- services/snmptrap/service.go | 225 ++++++---- .../snmptrap/snmptraptest/snmptraptest.go | 105 +++++ task_master.go | 9 +- 10 files changed, 477 insertions(+), 617 deletions(-) create mode 100644 services/snmptrap/snmptraptest/snmptraptest.go diff --git a/alert.go b/alert.go index bbbfd86a9e..83aa3be18d 100644 --- a/alert.go +++ b/alert.go @@ -6,10 +6,7 @@ import ( "fmt" html "html/template" "log" - "net" - "net/http" "os" - "os/exec" "sync" text "text/template" "time" @@ -26,6 +23,7 @@ import ( "github.com/influxdata/kapacitor/services/pagerduty" "github.com/influxdata/kapacitor/services/slack" "github.com/influxdata/kapacitor/services/smtp" + "github.com/influxdata/kapacitor/services/snmptrap" "github.com/influxdata/kapacitor/services/telegram" "github.com/influxdata/kapacitor/services/victorops" "github.com/influxdata/kapacitor/tick/stateful" @@ -242,9 +240,24 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * an.handlers = append(an.handlers, h) } - for _, snmpTrap := range n.SnmpTrapHandlers { - snmpTrap := snmpTrap - an.handlers = append(an.handlers, func(ad *AlertData) { an.handleSnmpTrap(snmpTrap, ad) }) + for _, s := range n.SNMPTrapHandlers { + dataList := make([]snmptrap.Data, len(s.DataList)) + for i, d := range s.DataList { + dataList[i] = snmptrap.Data{ + Oid: d.Oid, + Type: d.Type, + Value: d.Value, + } + } + c := snmptrap.HandlerConfig{ + TrapOid: s.TrapOid, + DataList: dataList, + } + h, err := et.tm.SNMPTrapService.Handler(c, l) + if err != nil { + return nil, errors.Wrapf(err, "failed to create SNMP handler") + } + an.handlers = append(an.handlers, h) } if len(n.TelegramHandlers) == 0 && (et.tm.TelegramService != nil && et.tm.TelegramService.Global()) { @@ -973,365 +986,5 @@ func (a *AlertNode) renderMessageAndDetails(id, name string, t time.Time, group } details := tmpBuffer.String() - return msg, details, dinfo, nil -} - -//-------------------------------- -// Alert handlers - -func (a *AlertNode) handlePost(post *pipeline.PostHandler, ad *AlertData) { - bodyBuffer := a.bufPool.Get().(*bytes.Buffer) - defer func() { - bodyBuffer.Reset() - a.bufPool.Put(bodyBuffer) - }() - - err := json.NewEncoder(bodyBuffer).Encode(ad) - if err != nil { - a.logger.Println("E! failed to marshal alert data json", err) - return - } - - resp, err := http.Post(post.URL, "application/json", bodyBuffer) - if err != nil { - a.logger.Println("E! failed to POST batch", err) - return - } - - if resp == nil { - a.logger.Println("E! failed to POST batch response is nil") - return - } - - // close http response otherwise tcp socket will be 'ESTABLISHED' in a long time - defer resp.Body.Close() - return -} - -func (a *AlertNode) handleTcp(tcp *pipeline.TcpHandler, ad *AlertData) { - buf := a.bufPool.Get().(*bytes.Buffer) - defer func() { - buf.Reset() - a.bufPool.Put(buf) - }() - - err := json.NewEncoder(buf).Encode(ad) - if err != nil { - a.logger.Println("E! failed to marshal alert data json", err) - return - } - - conn, err := net.Dial("tcp", tcp.Address) - if err != nil { - a.logger.Println("E! failed to connect", err) - return - } - defer conn.Close() - - buf.WriteByte('\n') - conn.Write(buf.Bytes()) - - return -} - -func (a *AlertNode) handleEmail(email *pipeline.EmailHandler, ad *AlertData) { - if err := a.et.tm.SMTPService.SendMail(email.ToList, ad.Message, ad.Details); err != nil { - a.logger.Println("E! failed to send email:", err) - } -} - -func (a *AlertNode) handleExec(ex *pipeline.ExecHandler, ad *AlertData) { - b, err := json.Marshal(ad) - if err != nil { - a.logger.Println("E! failed to marshal alert data json", err) - return - } - cmd := exec.Command(ex.Command[0], ex.Command[1:]...) - cmd.Stdin = bytes.NewBuffer(b) - var out bytes.Buffer - cmd.Stdout = &out - cmd.Stderr = &out - err = cmd.Run() - if err != nil { - a.logger.Println("E! error running alert command:", err, out.String()) - return - } -} - -func (a *AlertNode) handleLog(l *pipeline.LogHandler, ad *AlertData) { - b, err := json.Marshal(ad) - if err != nil { - a.logger.Println("E! failed to marshal alert data json", err) - return - } - f, err := os.OpenFile(l.FilePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, os.FileMode(l.Mode)) - if err != nil { - a.logger.Println("E! failed to open file for alert logging", err) - return - } - defer f.Close() - n, err := f.Write(b) - if n != len(b) || err != nil { - a.logger.Println("E! failed to write to file", err) - } - n, err = f.Write([]byte("\n")) - if n != 1 || err != nil { - a.logger.Println("E! failed to write to file", err) - } -} - -func (a *AlertNode) handleVictorOps(vo *pipeline.VictorOpsHandler, ad *AlertData) { - var messageType string - switch ad.Level { - case OKAlert: - messageType = "RECOVERY" - default: - messageType = ad.Level.String() - } - err := a.et.tm.VictorOpsService.Alert( - vo.RoutingKey, - messageType, - ad.Message, - ad.ID, - ad.Time, - ad.Data, - ) - if err != nil { - a.logger.Println("E! failed to send alert data to VictorOps:", err) - return - } -} - -func (a *AlertNode) handlePagerDuty(pd *pipeline.PagerDutyHandler, ad *AlertData) { - err := a.et.tm.PagerDutyService.Alert( - pd.ServiceKey, - ad.ID, - ad.Message, - ad.Level, - ad.Data, - ) - if err != nil { - a.logger.Println("E! failed to send alert data to PagerDuty:", err) - return - } -} - -func (a *AlertNode) handleSensu(sensu *pipeline.SensuHandler, ad *AlertData) { - err := a.et.tm.SensuService.Alert( - ad.ID, - ad.Message, - ad.Level, - ) - if err != nil { - a.logger.Println("E! failed to send alert data to Sensu:", err) - return - } -} - -func (a *AlertNode) handleSlack(slack *pipeline.SlackHandler, ad *AlertData) { - err := a.et.tm.SlackService.Alert( - slack.Channel, - ad.Message, - slack.Username, - slack.IconEmoji, - ad.Level, - ) - if err != nil { - a.logger.Println("E! failed to send alert data to Slack:", err) - return - } -} - -func (a *AlertNode) handleSnmpTrap(snmpTrap *pipeline.SnmpTrapHandler, ad *AlertData) { - if a.et.tm.SnmpTrapService == nil { - a.logger.Println("E! failed to send SNMP traps. SNMP is not enabled") - return - } - - // Template - /* - var buf bytes.Buffer - var tmpDataList [][]interface{} - for _, data := range snmpTrap.DataList { - var rowData []interface{} - for _, attr := range data { - err := attr.(*text.Template).Execute(&buf, ad.info) - if err != nil { - a.logger.Printf("E! failed to evaluate SNMP Trap attribute template %s", attr) - return - } - rowData = append(rowData, buf.String()) - buf.Reset() - } - tmpDataList = append(tmpDataList, rowData) - }*/ - - err := a.et.tm.SnmpTrapService.Alert( - snmpTrap.TrapOid, - snmpTrap.DataList, - ad.Level, - ) - if err != nil { - a.logger.Println("E! failed to send alert data by SNMP traps:", err) - return - } -} - -func (a *AlertNode) handleTelegram(telegram *pipeline.TelegramHandler, ad *AlertData) { - err := a.et.tm.TelegramService.Alert( - telegram.ChatId, - telegram.ParseMode, - ad.Message, - telegram.IsDisableWebPagePreview, - telegram.IsDisableNotification, - ) - if err != nil { - a.logger.Println("E! failed to send alert data to Telegram:", err) - return - } -} - -func (a *AlertNode) handleHipChat(hipchat *pipeline.HipChatHandler, ad *AlertData) { - err := a.et.tm.HipChatService.Alert( - hipchat.Room, - hipchat.Token, - ad.Message, - ad.Level, - ) - if err != nil { - a.logger.Println("E! failed to send alert data to HipChat:", err) - return - } -} - -type alertaHandler struct { - *pipeline.AlertaHandler - - resourceTmpl *text.Template - eventTmpl *text.Template - environmentTmpl *text.Template - valueTmpl *text.Template - groupTmpl *text.Template -} - -func (a *AlertNode) handleAlerta(alerta alertaHandler, ad *AlertData) { - var severity string - - switch ad.Level { - case OKAlert: - severity = "ok" - case InfoAlert: - severity = "informational" - case WarnAlert: - severity = "warning" - case CritAlert: - severity = "critical" - default: - severity = "indeterminate" - } - var buf bytes.Buffer - err := alerta.resourceTmpl.Execute(&buf, ad.info) - if err != nil { - a.logger.Printf("E! failed to evaluate Alerta Resource template %s", alerta.Resource) - return - } - resource := buf.String() - buf.Reset() - - type eventData struct { - idInfo - ID string - } - data := eventData{ - idInfo: ad.info.messageInfo.idInfo, - ID: ad.ID, - } - err = alerta.eventTmpl.Execute(&buf, data) - if err != nil { - a.logger.Printf("E! failed to evaluate Alerta Event template %s", alerta.Event) - return - } - event := buf.String() - buf.Reset() - - err = alerta.environmentTmpl.Execute(&buf, ad.info) - if err != nil { - a.logger.Printf("E! failed to evaluate Alerta Environment template %s", alerta.Environment) - return - } - environment := buf.String() - buf.Reset() - - err = alerta.groupTmpl.Execute(&buf, ad.info) - if err != nil { - a.logger.Printf("E! failed to evaluate Alerta Group template %s", alerta.Group) - return - } - group := buf.String() - buf.Reset() - - err = alerta.valueTmpl.Execute(&buf, ad.info) - if err != nil { - a.logger.Printf("E! failed to evaluate Alerta Value template %s", alerta.Value) - return - } - value := buf.String() - - service := alerta.Service - if len(alerta.Service) == 0 { - service = []string{ad.info.Name} - } - - err = a.et.tm.AlertaService.Alert( - alerta.Token, - resource, - event, - environment, - severity, - group, - value, - ad.Message, - alerta.Origin, - service, - ad.Data, - ) - if err != nil { - a.logger.Println("E! failed to send alert data to Alerta:", err) - return - } -} - -func (a *AlertNode) handleOpsGenie(og *pipeline.OpsGenieHandler, ad *AlertData) { - var messageType string - switch ad.Level { - case OKAlert: - messageType = "RECOVERY" - default: - messageType = ad.Level.String() - } - - err := a.et.tm.OpsGenieService.Alert( - og.TeamsList, - og.RecipientsList, - messageType, - ad.Message, - ad.ID, - ad.Time, - ad.Data, - ) - if err != nil { - a.logger.Println("E! failed to send alert data to OpsGenie:", err) - return - } -} - -func (a *AlertNode) handleTalk(talk *pipeline.TalkHandler, ad *AlertData) { - err := a.et.tm.TalkService.Alert( - ad.ID, - ad.Message, - ) - if err != nil { - a.logger.Println("E! failed to send alert data to Talk:", err) - return - } + return msg, details, nil } diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 87c04354d3..2fc71413eb 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -18,6 +18,7 @@ import ( "text/template" "time" + "github.com/davecgh/go-spew/spew" "github.com/influxdata/influxdb/client" "github.com/influxdata/influxdb/influxql" imodels "github.com/influxdata/influxdb/models" @@ -46,6 +47,7 @@ import ( "github.com/influxdata/kapacitor/services/smtp" "github.com/influxdata/kapacitor/services/smtp/smtptest" "github.com/influxdata/kapacitor/services/snmptrap" + "github.com/influxdata/kapacitor/services/snmptrap/snmptraptest" "github.com/influxdata/kapacitor/services/storage/storagetest" "github.com/influxdata/kapacitor/services/talk" "github.com/influxdata/kapacitor/services/talk/talktest" @@ -7348,60 +7350,13 @@ stream for i := range cmds { cmdsI[i] = cmds[i] } - if err := compareListIgnoreOrder(cmdsI, expCmds, func(got, exp interface{}) error { + if err := compareListIgnoreOrder(cmdsI, expCmds, func(got, exp interface{}) bool { g := got.(*commandtest.Command) e := exp.(*commandtest.Command) - return e.Compare(g) + return e.Compare(g) == nil }); err != nil { t.Error(err) } - - //for i := 0; i < 2; i++ { - // select { - // case cmd := <-cmdC: - // cmd.Lock() - // defer cmd.Unlock() - - // var err error - // for j, info := range expInfo { - // if got, exp := cmd.Info, info; reflect.DeepEqual(got, exp) { - // // Found match remove it - // if j == 0 { - // expInfo = expInfo[1:] - // } else { - // expInfo = expInfo[:1] - // } - // err = nil - // break - // } else { - // err = fmt.Errorf("%d unexpected command info:\ngot\n%+v\nexp\n%+v\n", i, got, exp) - // } - // } - // if err != nil { - // t.Error(err) - // } - - // if !cmd.Started { - // t.Errorf("%d expected command to have been started", i) - // } - // if !cmd.Waited { - // t.Errorf("%d expected command to have waited", i) - // } - // if cmd.Killed { - // t.Errorf("%d expected command not to have been killed", i) - // } - - // ad := alertservice.AlertData{} - // if err := json.Unmarshal(cmd.StdinData, &ad); err != nil { - // t.Fatal(err) - // } - // if got, exp := ad, expAD; !reflect.DeepEqual(got, exp) { - // t.Errorf("%d unexpected alert data sent to command:\ngot\n%+v\nexp\n%+v\n%s", i, got, exp, string(cmd.StdinData)) - // } - // default: - // t.Error("expected command to be created") - // } - //} } func TestStream_AlertEmail(t *testing.T) { @@ -7508,6 +7463,138 @@ Value: 10 } } +func TestStream_AlertSNMPTrap(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + .groupBy('host') + |window() + .period(10s) + .every(10s) + |count('value') + |alert() + .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') + .info(lambda: "count" > 6.0) + .warn(lambda: "count" > 7.0) + .crit(lambda: "count" > 8.0) + .snmpTrap('1.1.1') + .data('1.1.1.2', 'c', '1') + .data('1.1.1.2', 's', 'SNMP ALERT') + .data('1.1.1.2', 's', '{{.Message}}') + .snmpTrap('1.1.2') + .data('1.1.2.3', 'i', '10') + .data('1.1.2.3', 'n', '') + .data('1.1.2.3', 't', '20000') +` + + expTraps := []interface{}{ + snmptraptest.Trap{ + Pdu: snmptraptest.Pdu{ + Type: snmpgo.SNMPTrapV2, + ErrorStatus: snmpgo.NoError, + VarBinds: snmptraptest.VarBinds{ + { + Oid: "1.3.6.1.2.1.1.3.0", + Value: "1000", + Type: "TimeTicks", + }, + { + Oid: "1.3.6.1.6.3.1.1.4.1.0", + Value: "1.1.1", + Type: "Oid", + }, + { + Oid: "1.1.1.2", + Value: "1", + Type: "Counter64", + }, + { + Oid: "1.1.1.2", + Value: "SNMP ALERT", + Type: "OctetString", + }, + { + Oid: "1.1.1.2", + Value: "kapacitor/cpu/serverA is CRITICAL", + Type: "OctetString", + }, + }, + }, + }, + snmptraptest.Trap{ + Pdu: snmptraptest.Pdu{ + Type: snmpgo.SNMPTrapV2, + ErrorStatus: snmpgo.NoError, + VarBinds: snmptraptest.VarBinds{ + { + Oid: "1.3.6.1.2.1.1.3.0", + Value: "1000", + Type: "TimeTicks", + }, + { + Oid: "1.3.6.1.6.3.1.1.4.1.0", + Value: "1.1.2", + Type: "Oid", + }, + { + Oid: "1.1.2.3", + Value: "10", + Type: "Integer", + }, + { + Oid: "1.1.2.3", + Value: "", + Type: "Null", + }, + { + Oid: "1.1.2.3", + Value: "20000", + Type: "TimeTicks", + }, + }, + }, + }, + } + + snmpServer, err := snmptraptest.NewServer() + if err != nil { + t.Fatal(err) + } + defer snmpServer.Close() + + c := snmptrap.NewConfig() + c.Enabled = true + c.TargetIp = "127.0.0.1" + c.TargetPort = 9162 + c.Community = "public" + c.Retries = 2 + st := snmptrap.NewService(c, logService.NewLogger("[test_snmptrap] ", log.LstdFlags)) + if err := st.Open(); err != nil { + t.Fatal(err) + } + defer st.Close() + + tmInit := func(tm *kapacitor.TaskMaster) { + tm.SNMPTrapService = st + } + + testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, tmInit) + + snmpServer.Close() + + traps := snmpServer.Traps() + got := make([]interface{}, len(traps)) + for i, t := range traps { + got[i] = t + } + if err := compareListIgnoreOrder(got, expTraps, nil); err != nil { + t.Error(err) + } +} + func TestStream_AlertSigma(t *testing.T) { requestCount := int32(0) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -8443,108 +8530,31 @@ func testStreamerWithOutput( } } -func compareListIgnoreOrder(got, exp []interface{}, cmpF func(got, exp interface{}) error) error { +func compareListIgnoreOrder(got, exp []interface{}, cmpF func(got, exp interface{}) bool) error { if len(got) != len(exp) { - return fmt.Errorf("unexpected count got %d exp %d", len(got), len(exp)) + return fmt.Errorf("unequal lists ignoring order:\ngot\n%s\nexp\n%s\n", spew.Sdump(got), spew.Sdump(exp)) } if cmpF == nil { - cmpF = func(got, exp interface{}) error { + cmpF = func(got, exp interface{}) bool { if !reflect.DeepEqual(got, exp) { - return fmt.Errorf("\ngot\n%+v\nexp\n%+v\n", got, exp) + return false } - return nil + return true } } for _, e := range exp { found := false - var err error for _, g := range got { - if err = cmpF(g, e); err == nil { + if cmpF(g, e) { found = true break } } if !found { - return err + return fmt.Errorf("unequal lists ignoring order:\ngot\n%s\nexp\n%s\n", spew.Sdump(got), spew.Sdump(exp)) } } return nil } - -// SNMPTrap - -type TrapListener struct { - data string -} - -func (l *TrapListener) OnTRAP(trap *snmpgo.TrapRequest) { - l.data = trap.Pdu.String() -} - -func NewTrapListener(data *string) *TrapListener { - return &TrapListener{*data} -} - -func TestStream_AlertSnmpTrap(t *testing.T) { - trapData := "" - server, err := snmpgo.NewTrapServer(snmpgo.ServerArguments{ - LocalAddr: "127.0.0.1:9162", - }) - if err != nil { - log.Fatal(err) - } - // V2c - err = server.AddSecurity(&snmpgo.SecurityEntry{ - Version: snmpgo.V2c, - Community: "public", - }) - if err != nil { - log.Fatal(err) - } - go server.Serve(NewTrapListener(&trapData)) - - var script = ` -stream - |from() - .measurement('cpu') - .where(lambda: "host" == 'serverA') - .groupBy('host') - |window() - .period(10s) - .every(10s) - |count('value') - |alert() - .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') - .info(lambda: "count" > 6.0) - .warn(lambda: "count" > 7.0) - .crit(lambda: "count" > 8.0) - .snmptrap('1.1.1') - .data('1.1.1.2', 's', 'SNMP ALERT') -` - - clock, et, replayErr, tm := testStreamer(t, "TestStream_Alert", script, nil) - defer tm.Close() - - c := snmptrap.NewConfig() - c.Enabled = true - c.TargetIp = "127.0.0.1" - c.TargetPort = 9162 - c.Community = "public" - c.Version = "2c" - st := snmptrap.NewService(c, logService.NewLogger("[test_snmptrap] ", log.LstdFlags)) - tm.SnmpTrapService = st - - err = fastForwardTask(clock, et, replayErr, tm, 13*time.Second) - if err != nil { - t.Error(err) - } - - // Stop server - server.Close() - - if trapData == "" { - t.Errorf("No SNMP Trap data received") - } -} diff --git a/pipeline/alert.go b/pipeline/alert.go index c6cf9c0dbb..e07f260745 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -339,7 +339,7 @@ type AlertNode struct { // Send alert using SNMPtraps. // tick:ignore - SnmpTrapHandlers []*SnmpTrapHandler `tick:"SnmpTrap"` + SNMPTrapHandlers []*SNMPTrapHandler `tick:"SnmpTrap"` } func newAlertNode(wants EdgeType) *AlertNode { @@ -1265,25 +1265,31 @@ type TalkHandler struct { // Send alerts to `target-ip:target-port` on OID '1.3.6.1.2.1.1.7' // // tick:property -func (a *AlertNode) SnmpTrap(trapOid string, dataList ...[3]string) *SnmpTrapHandler { - snmpTrap := &SnmpTrapHandler{ +func (a *AlertNode) SnmpTrap(trapOid string) *SNMPTrapHandler { + snmpTrap := &SNMPTrapHandler{ AlertNode: a, TrapOid: trapOid, - DataList: dataList, } - a.SnmpTrapHandlers = append(a.SnmpTrapHandlers, snmpTrap) + a.SNMPTrapHandlers = append(a.SNMPTrapHandlers, snmpTrap) return snmpTrap } // SNMPTrap AlertHandler // tick:embedded:AlertNode.SnmpTrap -type SnmpTrapHandler struct { +type SNMPTrapHandler struct { *AlertNode TrapOid string // List of trap data. // tick:ignore - DataList [][3]string `tick:"Data"` + DataList []SNMPData `tick:"Data"` +} + +// tick:ignore +type SNMPData struct { + Oid string + Type string + Value string } // Define Data for SNMP Trap alert. @@ -1306,9 +1312,13 @@ type SnmpTrapHandler struct { // .data('1.3.6.1.4.1..1.7', 's', '{{ index .Fields "used_percent" }}' ) // // tick:property -func (h *SnmpTrapHandler) Data(oid, rawType, value string) *SnmpTrapHandler { +func (h *SNMPTrapHandler) Data(oid, t, value string) *SNMPTrapHandler { // TODO check element validity - data := [3]string{oid, rawType, value} + data := SNMPData{ + Oid: oid, + Type: t, + Value: value, + } h.DataList = append(h.DataList, data) return h } diff --git a/server/config.go b/server/config.go index f929042bf7..9a1c9d0e5d 100644 --- a/server/config.go +++ b/server/config.go @@ -66,12 +66,12 @@ type Config struct { OpsGenie opsgenie.Config `toml:"opsgenie" override:"opsgenie"` PagerDuty pagerduty.Config `toml:"pagerduty" override:"pagerduty"` SMTP smtp.Config `toml:"smtp" override:"smtp"` + SNMPTrap snmptrap.Config `toml:"snmptrap" override:"snmptrap"` Sensu sensu.Config `toml:"sensu" override:"sensu"` Slack slack.Config `toml:"slack" override:"slack"` Talk talk.Config `toml:"talk" override:"talk"` Telegram telegram.Config `toml:"telegram" override:"telegram"` VictorOps victorops.Config `toml:"victorops" override:"victorops"` - SnmpTrap snmptrap.Config `toml:"snmptrap" override:"snmptrap"` // Third-party integrations Kubernetes k8s.Config `toml:"kubernetes" override:"kubernetes"` @@ -117,7 +117,7 @@ func NewConfig() *Config { c.Sensu = sensu.NewConfig() c.Slack = slack.NewConfig() c.Talk = talk.NewConfig() - c.SnmpTrap = snmptrap.NewConfig() + c.SNMPTrap = snmptrap.NewConfig() c.Telegram = telegram.NewConfig() c.VictorOps = victorops.NewConfig() @@ -225,13 +225,13 @@ func (c *Config) Validate() error { if err := c.SMTP.Validate(); err != nil { return err } - if err := c.Sensu.Validate(); err != nil { + if err := c.SNMPTrap.Validate(); err != nil { return err } - if err := c.Slack.Validate(); err != nil { + if err := c.Sensu.Validate(); err != nil { return err } - if err := c.SnmpTrap.Validate(); err != nil { + if err := c.Slack.Validate(); err != nil { return err } if err := c.Talk.Validate(); err != nil { diff --git a/server/server.go b/server/server.go index 5ab5ac990c..8f5d8cd507 100644 --- a/server/server.go +++ b/server/server.go @@ -190,7 +190,7 @@ func New(c *Config, buildInfo BuildInfo, logService logging.Interface) (*Server, s.appendHipChatService() s.appendAlertaService() s.appendSlackService() - s.appendSnmpTrapService() + s.appendSNMPTrapService() s.appendSensuService() s.appendSlackService() s.appendTalkService() @@ -450,11 +450,13 @@ func (s *Server) appendSlackService() { s.AppendService("slack", srv) } -func (s *Server) appendSnmpTrapService() { - c := s.config.SnmpTrap +func (s *Server) appendSNMPTrapService() { + c := s.config.SNMPTrap l := s.LogService.NewLogger("[snmptrap] ", log.LstdFlags) srv := snmptrap.NewService(c, l) - s.TaskMaster.SnmpTrapService = srv + + s.TaskMaster.SNMPTrapService = srv + s.AlertService.SNMPTrapService = srv s.SetDynamicService("snmptrap", srv) s.AppendService("snmptrap", srv) diff --git a/services/alert/service.go b/services/alert/service.go index 5840493906..c59aa5657a 100644 --- a/services/alert/service.go +++ b/services/alert/service.go @@ -22,6 +22,7 @@ import ( "github.com/influxdata/kapacitor/services/pagerduty" "github.com/influxdata/kapacitor/services/slack" "github.com/influxdata/kapacitor/services/smtp" + "github.com/influxdata/kapacitor/services/snmptrap" "github.com/influxdata/kapacitor/services/storage" "github.com/influxdata/kapacitor/services/telegram" "github.com/influxdata/kapacitor/services/victorops" @@ -110,6 +111,9 @@ type Service struct { SMTPService interface { Handler(smtp.HandlerConfig, *log.Logger) alert.Handler } + SNMPTrapService interface { + Handler(snmptrap.HandlerConfig, *log.Logger) (alert.Handler, error) + } TalkService interface { Handler(*log.Logger) alert.Handler } @@ -978,6 +982,17 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle } h := s.SMTPService.Handler(c, s.logger) ha = newPassThroughHandler(h) + case "snmp": + c := snmptrap.HandlerConfig{} + err = decodeOptions(spec.Options, &c) + if err != nil { + return + } + h, err := s.SNMPTrapService.Handler(c, s.logger) + if err != nil { + return nil, err + } + ha = newPassThroughHandler(h) case "talk": h := s.TalkService.Handler(s.logger) ha = newPassThroughHandler(h) diff --git a/services/snmptrap/config.go b/services/snmptrap/config.go index 975f288b63..adb36c48da 100644 --- a/services/snmptrap/config.go +++ b/services/snmptrap/config.go @@ -1,9 +1,6 @@ package snmptrap -import ( - "github.com/influxdata/kapacitor" - "github.com/pkg/errors" -) +import "errors" type Config struct { // Whether Snmptrap is enabled. @@ -14,28 +11,24 @@ type Config struct { TargetPort int `toml:"target-port" override:"target-port"` // SNMP Community Community string `toml:"community" override:"community"` - // SNMP Version - Version string `toml:"version" override:"version"` - // Whether all alerts should automatically post to snmptrap - Global bool `toml:"global" override:"global"` - // Whether all alerts should automatically use stateChangesOnly mode. - // Only applies if global is also set. - StateChangesOnly bool `toml:"state-changes-only" override:"state-changes-only"` + // Retries count for traps + Retries int `toml:"retries" override:"retries"` } func NewConfig() Config { return Config{ - Community: kapacitor.Product, - Version: "2c", + Community: "kapacitor", } } func (c Config) Validate() error { - if c.Enabled && c.TargetIp == "" { - return errors.New("must specify target-ip") - } - if c.Enabled && c.TargetPort <= 0 { - return errors.New("must specify target-port") + if c.Enabled { + if c.TargetIp == "" { + return errors.New("must specify target-ip") + } + if c.TargetPort <= 0 { + return errors.New("must specify target-port") + } } return nil } diff --git a/services/snmptrap/service.go b/services/snmptrap/service.go index 1165f47c36..50637e5712 100644 --- a/services/snmptrap/service.go +++ b/services/snmptrap/service.go @@ -1,19 +1,23 @@ package snmptrap import ( - "errors" + "bytes" "fmt" "log" + "net" "strconv" "sync/atomic" + text "text/template" - "github.com/influxdata/kapacitor" + "github.com/influxdata/kapacitor/alert" "github.com/k-sone/snmpgo" + "github.com/pkg/errors" ) type Service struct { - configValue atomic.Value - logger *log.Logger + configValue atomic.Value + snmpClientValue atomic.Value + logger *log.Logger } func NewService(c Config, l *log.Logger) *Service { @@ -21,20 +25,53 @@ func NewService(c Config, l *log.Logger) *Service { logger: l, } s.configValue.Store(c) + s.snmpClientValue.Store((*snmpgo.SNMP)(nil)) return s } func (s *Service) Open() error { + c := s.config() + if c.Enabled { + snmp, err := s.newSNMPClient(c) + if err != nil { + return err + } + s.snmpClientValue.Store(snmp) + } return nil } func (s *Service) Close() error { + snmp := s.snmpClient() + if snmp != nil { + snmp.Close() + } return nil } func (s *Service) config() Config { return s.configValue.Load().(Config) } +func (s *Service) snmpClient() *snmpgo.SNMP { + return s.snmpClientValue.Load().(*snmpgo.SNMP) +} + +func (s *Service) newSNMPClient(c Config) (*snmpgo.SNMP, error) { + address := net.JoinHostPort(c.TargetIp, strconv.Itoa(c.TargetPort)) + snmp, err := snmpgo.NewSNMP(snmpgo.SNMPArguments{ + Version: snmpgo.V2c, + Address: address, + Retries: uint(c.Retries), + Community: c.Community, + }) + if err != nil { + return nil, errors.Wrap(err, "invalid SNMP configuration") + } + if err := snmp.Open(); err != nil { + return nil, errors.Wrap(err, "failed to SNMP open connection") + } + return snmp, nil +} func (s *Service) Update(newConfig []interface{}) error { if l := len(newConfig); l != 1 { @@ -43,34 +80,42 @@ func (s *Service) Update(newConfig []interface{}) error { if c, ok := newConfig[0].(Config); !ok { return fmt.Errorf("expected config object to be of type %T, got %T", c, newConfig[0]) } else { - s.configValue.Store(c) + old := s.config() + if old != c { + if c.Enabled { + snmp, err := s.newSNMPClient(c) + if err != nil { + return err + } + s.snmpClientValue.Store(snmp) + } else { + snmp := s.snmpClient() + if snmp != nil { + snmp.Close() + } + s.snmpClientValue.Store((*snmpgo.SNMP)(nil)) + } + s.configValue.Store(c) + } } return nil } -func (s *Service) Global() bool { - c := s.config() - return c.Global -} - -func (s *Service) StateChangesOnly() bool { - c := s.config() - return c.StateChangesOnly -} - type testOptions struct { - TargetIp string `json:"target-ip"` - TargetPort int `json:"target-port"` - Community string `json:"community"` - Version string `json:"version"` - Message string `json:"message"` - Level kapacitor.AlertLevel `json:"level"` + TrapOid string `json:"trap-oid"` + Data Data `json:"data"` + Level alert.Level `json:"level"` } func (s *Service) TestOptions() interface{} { return &testOptions{ - Message: "test snmptrap message", - Level: kapacitor.CritAlert, + TrapOid: "1.1.1.1", + Data: Data{ + Oid: "1.1.1.1.2", + Type: "s", + Value: "test snmptrap message", + }, + Level: alert.Critical, } } @@ -79,56 +124,38 @@ func (s *Service) Test(options interface{}) error { if !ok { return fmt.Errorf("unexpected options type %T", options) } - trapOid := "1.1.1.1" - var dataList [][3]string - var data [3]string - data[0] = "1.1.1.1.2" - data[1] = "s" - data[2] = "test msg" - dataList = append(dataList, data) - return s.Alert(trapOid, dataList, o.Level) + return s.Alert(o.TrapOid, []Data{o.Data}, o.Level) } -func (s *Service) Alert(trapOid string, dataList [][3]string, level kapacitor.AlertLevel) error { +func (s *Service) Alert(trapOid string, dataList []Data, level alert.Level) error { c := s.config() - // SNMP target address - address := c.TargetIp + ":" + strconv.Itoa(c.TargetPort) - // SNMP version - var version snmpgo.SNMPVersion - switch c.Version { - case "1": - return errors.New("Version 1 not supported yet") - case "2c": - version = snmpgo.V2c - case "3": - return errors.New("Version 3 not supported yet") - default: - return errors.New("Bad snmp version should be: '1', '2c' or '3'") - } - // Create SNMP client - snmp, err := snmpgo.NewSNMP(snmpgo.SNMPArguments{ - Version: version, - Address: address, - Retries: 1, - Community: c.Community, - }) + if !c.Enabled { + return errors.New("service is not enabled") + } - var varBinds snmpgo.VarBinds // Add trap oid - varBinds = append(varBinds, snmpgo.NewVarBind(snmpgo.OidSysUpTime, snmpgo.NewTimeTicks(1000))) - oid, _ := snmpgo.NewOid(trapOid) - varBinds = append(varBinds, snmpgo.NewVarBind(snmpgo.OidSnmpTrap, oid)) + oid, err := snmpgo.NewOid(trapOid) + if err != nil { + return errors.Wrapf(err, "invalid trap Oid %q", trapOid) + } + varBinds := snmpgo.VarBinds{ + // TODO why are we hardcoding a 1000 uptime value? + snmpgo.NewVarBind(snmpgo.OidSysUpTime, snmpgo.NewTimeTicks(1000)), + snmpgo.NewVarBind(snmpgo.OidSnmpTrap, oid), + } + // Add Data for _, data := range dataList { - oidStr := data[0] - oidTypeRaw := data[1] - oid, _ := snmpgo.NewOid(oidStr) + oid, err := snmpgo.NewOid(data.Oid) + if err != nil { + return errors.Wrapf(err, "invalid data Oid %q", data.Oid) + } // http://docstore.mik.ua/orelly/networking_2ndEd/snmp/ch10_03.htm - switch oidTypeRaw { + switch data.Type { case "a": return errors.New("Snmptrap Datatype 'IP address' not supported") case "c": - oidValue, err := strconv.ParseInt(data[2], 10, 64) + oidValue, err := strconv.ParseInt(data.Value, 10, 64) if err != nil { return err } @@ -136,7 +163,7 @@ func (s *Service) Alert(trapOid string, dataList [][3]string, level kapacitor.Al case "d": return errors.New("Snmptrap Datatype 'Decimal string' not supported") case "i": - oidValue, err := strconv.ParseInt(data[2], 10, 64) + oidValue, err := strconv.ParseInt(data.Value, 10, 64) if err != nil { return err } @@ -146,10 +173,10 @@ func (s *Service) Alert(trapOid string, dataList [][3]string, level kapacitor.Al case "o": return errors.New("Snmptrap Datatype 'Object ID' not supported") case "s": - oidValue := []byte(data[2]) + oidValue := []byte(data.Value) varBinds = append(varBinds, snmpgo.NewVarBind(oid, snmpgo.NewOctetString(oidValue))) case "t": - oidValue, err := strconv.ParseInt(data[2], 10, 64) + oidValue, err := strconv.ParseInt(data.Value, 10, 64) if err != nil { return err } @@ -159,22 +186,68 @@ func (s *Service) Alert(trapOid string, dataList [][3]string, level kapacitor.Al case "x": return errors.New("Snmptrap Datatype 'Hexadecimal string' not supported") default: - return errors.New("Snmptrap Datatype not supported: " + oidTypeRaw) + return fmt.Errorf("Snmptrap Datatype not known: %v", data.Type) } } - if err = snmp.Open(); err != nil { - // Failed to open connection - fmt.Println(err) - return err + snmp := s.snmpClient() + if err = snmp.V2Trap(varBinds); err != nil { + return errors.Wrap(err, "failed to send SNMP trap") } - defer snmp.Close() + return nil +} - if err = snmp.V2Trap(varBinds); err != nil { - // Failed to request - fmt.Println(err) - return err +type HandlerConfig struct { + TrapOid string `mapstructure:"trap-oid"` + DataList []Data `mapstructure:"data-list"` +} + +type Data struct { + Oid string `mapstructure:"oid" json:"oid"` + Type string `mapstructure:"type" json:"type"` + Value string `mapstructure:"value" json:"value"` + tmpl *text.Template +} + +// handler provides the implementation of the alert.Handler interface for the Foo service. +type handler struct { + s *Service + c HandlerConfig + logger *log.Logger +} + +// Handler creates a handler from the config. +func (s *Service) Handler(c HandlerConfig, l *log.Logger) (alert.Handler, error) { + // Compile data value templates + for i, d := range c.DataList { + tmpl, err := text.New("data").Parse(d.Value) + if err != nil { + return nil, err + } + c.DataList[i].tmpl = tmpl } + return &handler{ + s: s, + c: c, + logger: l, + }, nil +} - return nil +// Handle takes an event and posts its message to the Foo service chat room. +func (h *handler) Handle(event alert.Event) { + // Execute templates + td := event.TemplateData() + var buf bytes.Buffer + for i, d := range h.c.DataList { + err := d.tmpl.Execute(&buf, td) + if err != nil { + h.logger.Println("E! failed to handle event", err) + return + } + h.c.DataList[i].Value = buf.String() + buf.Reset() + } + if err := h.s.Alert(h.c.TrapOid, h.c.DataList, event.State.Level); err != nil { + h.logger.Println("E! failed to handle event", err) + } } diff --git a/services/snmptrap/snmptraptest/snmptraptest.go b/services/snmptrap/snmptraptest/snmptraptest.go new file mode 100644 index 0000000000..a3f43204fb --- /dev/null +++ b/services/snmptrap/snmptraptest/snmptraptest.go @@ -0,0 +1,105 @@ +package snmptraptest + +import ( + "sync" + + "github.com/k-sone/snmpgo" +) + +type Server struct { + traps []Trap + + closed bool + mu sync.Mutex + wg sync.WaitGroup + srv *snmpgo.TrapServer +} + +func NewServer() (*Server, error) { + srv, err := snmpgo.NewTrapServer(snmpgo.ServerArguments{ + LocalAddr: "127.0.0.1:9162", + }) + if err != nil { + return nil, err + } + s := &Server{ + srv: srv, + } + s.srv.AddSecurity(&snmpgo.SecurityEntry{ + Version: snmpgo.V2c, + Community: "public", + }) + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.srv.Serve(s) + }() + return s, nil +} + +func (s *Server) Traps() []Trap { + s.mu.Lock() + defer s.mu.Unlock() + return s.traps +} + +func (s *Server) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return nil + } + s.closed = true + return s.srv.Close() +} + +func (s *Server) OnTRAP(trap *snmpgo.TrapRequest) { + s.mu.Lock() + defer s.mu.Unlock() + t := Trap{ + Pdu: convertPdu(trap.Pdu), + Error: trap.Error, + } + s.traps = append(s.traps, t) +} + +type Trap struct { + Pdu Pdu + Error error +} + +type Pdu struct { + Type snmpgo.PduType + ErrorStatus snmpgo.ErrorStatus + ErrorIndex int + VarBinds VarBinds +} + +func convertPdu(pdu snmpgo.Pdu) Pdu { + return Pdu{ + Type: pdu.PduType(), + ErrorStatus: pdu.ErrorStatus(), + ErrorIndex: pdu.ErrorIndex(), + VarBinds: convertVarBinds(pdu.VarBinds()), + } +} + +type VarBinds []VarBind + +type VarBind struct { + Oid string + Value string + Type string +} + +func convertVarBinds(vbs snmpgo.VarBinds) VarBinds { + binds := make(VarBinds, len(vbs)) + for i, vb := range vbs { + binds[i] = VarBind{ + Oid: vb.Oid.String(), + Value: vb.Variable.String(), + Type: vb.Variable.Type(), + } + } + return binds +} diff --git a/task_master.go b/task_master.go index 72c1117178..9a844b96e7 100644 --- a/task_master.go +++ b/task_master.go @@ -22,6 +22,7 @@ import ( "github.com/influxdata/kapacitor/services/pagerduty" "github.com/influxdata/kapacitor/services/slack" "github.com/influxdata/kapacitor/services/smtp" + "github.com/influxdata/kapacitor/services/snmptrap" "github.com/influxdata/kapacitor/services/telegram" "github.com/influxdata/kapacitor/services/victorops" "github.com/influxdata/kapacitor/tick" @@ -99,10 +100,8 @@ type TaskMaster struct { StateChangesOnly() bool Handler(slack.HandlerConfig, *log.Logger) alert.Handler } - SnmpTrapService interface { - Global() bool - StateChangesOnly() bool - Alert(trapOid string, dataList [][3]string, level AlertLevel) error + SNMPTrapService interface { + Handler(snmptrap.HandlerConfig, *log.Logger) (alert.Handler, error) } TelegramService interface { Global() bool @@ -204,7 +203,7 @@ func (tm *TaskMaster) New(id string) *TaskMaster { n.PagerDutyService = tm.PagerDutyService n.SlackService = tm.SlackService n.TelegramService = tm.TelegramService - n.SnmpTrapService = tm.SnmpTrapService + n.SNMPTrapService = tm.SNMPTrapService n.HipChatService = tm.HipChatService n.AlertaService = tm.AlertaService n.SensuService = tm.SensuService