Skip to content

Commit

Permalink
initial rework for the new alert system
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jan 6, 2017
1 parent e2c44d1 commit 95c591b
Show file tree
Hide file tree
Showing 10 changed files with 477 additions and 617 deletions.
387 changes: 20 additions & 367 deletions alert.go

Large diffs are not rendered by default.

276 changes: 143 additions & 133 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"text/template"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/influxdata/influxdb/client"
"github.com/influxdata/influxdb/influxql"
imodels "github.com/influxdata/influxdb/models"
Expand Down Expand Up @@ -46,6 +47,7 @@ import (
"github.com/influxdata/kapacitor/services/smtp"
"github.com/influxdata/kapacitor/services/smtp/smtptest"
"github.com/influxdata/kapacitor/services/snmptrap"
"github.com/influxdata/kapacitor/services/snmptrap/snmptraptest"
"github.com/influxdata/kapacitor/services/storage/storagetest"
"github.com/influxdata/kapacitor/services/talk"
"github.com/influxdata/kapacitor/services/talk/talktest"
Expand Down Expand Up @@ -7348,60 +7350,13 @@ stream
for i := range cmds {
cmdsI[i] = cmds[i]
}
if err := compareListIgnoreOrder(cmdsI, expCmds, func(got, exp interface{}) error {
if err := compareListIgnoreOrder(cmdsI, expCmds, func(got, exp interface{}) bool {
g := got.(*commandtest.Command)
e := exp.(*commandtest.Command)
return e.Compare(g)
return e.Compare(g) == nil
}); err != nil {
t.Error(err)
}

//for i := 0; i < 2; i++ {
// select {
// case cmd := <-cmdC:
// cmd.Lock()
// defer cmd.Unlock()

// var err error
// for j, info := range expInfo {
// if got, exp := cmd.Info, info; reflect.DeepEqual(got, exp) {
// // Found match remove it
// if j == 0 {
// expInfo = expInfo[1:]
// } else {
// expInfo = expInfo[:1]
// }
// err = nil
// break
// } else {
// err = fmt.Errorf("%d unexpected command info:\ngot\n%+v\nexp\n%+v\n", i, got, exp)
// }
// }
// if err != nil {
// t.Error(err)
// }

// if !cmd.Started {
// t.Errorf("%d expected command to have been started", i)
// }
// if !cmd.Waited {
// t.Errorf("%d expected command to have waited", i)
// }
// if cmd.Killed {
// t.Errorf("%d expected command not to have been killed", i)
// }

// ad := alertservice.AlertData{}
// if err := json.Unmarshal(cmd.StdinData, &ad); err != nil {
// t.Fatal(err)
// }
// if got, exp := ad, expAD; !reflect.DeepEqual(got, exp) {
// t.Errorf("%d unexpected alert data sent to command:\ngot\n%+v\nexp\n%+v\n%s", i, got, exp, string(cmd.StdinData))
// }
// default:
// t.Error("expected command to be created")
// }
//}
}

func TestStream_AlertEmail(t *testing.T) {
Expand Down Expand Up @@ -7508,6 +7463,138 @@ Value: 10
}
}

func TestStream_AlertSNMPTrap(t *testing.T) {

var script = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
|window()
.period(10s)
.every(10s)
|count('value')
|alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.info(lambda: "count" > 6.0)
.warn(lambda: "count" > 7.0)
.crit(lambda: "count" > 8.0)
.snmpTrap('1.1.1')
.data('1.1.1.2', 'c', '1')
.data('1.1.1.2', 's', 'SNMP ALERT')
.data('1.1.1.2', 's', '{{.Message}}')
.snmpTrap('1.1.2')
.data('1.1.2.3', 'i', '10')
.data('1.1.2.3', 'n', '')
.data('1.1.2.3', 't', '20000')
`

expTraps := []interface{}{
snmptraptest.Trap{
Pdu: snmptraptest.Pdu{
Type: snmpgo.SNMPTrapV2,
ErrorStatus: snmpgo.NoError,
VarBinds: snmptraptest.VarBinds{
{
Oid: "1.3.6.1.2.1.1.3.0",
Value: "1000",
Type: "TimeTicks",
},
{
Oid: "1.3.6.1.6.3.1.1.4.1.0",
Value: "1.1.1",
Type: "Oid",
},
{
Oid: "1.1.1.2",
Value: "1",
Type: "Counter64",
},
{
Oid: "1.1.1.2",
Value: "SNMP ALERT",
Type: "OctetString",
},
{
Oid: "1.1.1.2",
Value: "kapacitor/cpu/serverA is CRITICAL",
Type: "OctetString",
},
},
},
},
snmptraptest.Trap{
Pdu: snmptraptest.Pdu{
Type: snmpgo.SNMPTrapV2,
ErrorStatus: snmpgo.NoError,
VarBinds: snmptraptest.VarBinds{
{
Oid: "1.3.6.1.2.1.1.3.0",
Value: "1000",
Type: "TimeTicks",
},
{
Oid: "1.3.6.1.6.3.1.1.4.1.0",
Value: "1.1.2",
Type: "Oid",
},
{
Oid: "1.1.2.3",
Value: "10",
Type: "Integer",
},
{
Oid: "1.1.2.3",
Value: "",
Type: "Null",
},
{
Oid: "1.1.2.3",
Value: "20000",
Type: "TimeTicks",
},
},
},
},
}

snmpServer, err := snmptraptest.NewServer()
if err != nil {
t.Fatal(err)
}
defer snmpServer.Close()

c := snmptrap.NewConfig()
c.Enabled = true
c.TargetIp = "127.0.0.1"
c.TargetPort = 9162
c.Community = "public"
c.Retries = 2
st := snmptrap.NewService(c, logService.NewLogger("[test_snmptrap] ", log.LstdFlags))
if err := st.Open(); err != nil {
t.Fatal(err)
}
defer st.Close()

tmInit := func(tm *kapacitor.TaskMaster) {
tm.SNMPTrapService = st
}

testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, tmInit)

snmpServer.Close()

traps := snmpServer.Traps()
got := make([]interface{}, len(traps))
for i, t := range traps {
got[i] = t
}
if err := compareListIgnoreOrder(got, expTraps, nil); err != nil {
t.Error(err)
}
}

func TestStream_AlertSigma(t *testing.T) {
requestCount := int32(0)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -8443,108 +8530,31 @@ func testStreamerWithOutput(
}
}

func compareListIgnoreOrder(got, exp []interface{}, cmpF func(got, exp interface{}) error) error {
func compareListIgnoreOrder(got, exp []interface{}, cmpF func(got, exp interface{}) bool) error {
if len(got) != len(exp) {
return fmt.Errorf("unexpected count got %d exp %d", len(got), len(exp))
return fmt.Errorf("unequal lists ignoring order:\ngot\n%s\nexp\n%s\n", spew.Sdump(got), spew.Sdump(exp))
}

if cmpF == nil {
cmpF = func(got, exp interface{}) error {
cmpF = func(got, exp interface{}) bool {
if !reflect.DeepEqual(got, exp) {
return fmt.Errorf("\ngot\n%+v\nexp\n%+v\n", got, exp)
return false
}
return nil
return true
}
}

for _, e := range exp {
found := false
var err error
for _, g := range got {
if err = cmpF(g, e); err == nil {
if cmpF(g, e) {
found = true
break
}
}
if !found {
return err
return fmt.Errorf("unequal lists ignoring order:\ngot\n%s\nexp\n%s\n", spew.Sdump(got), spew.Sdump(exp))
}
}
return nil
}

// SNMPTrap

type TrapListener struct {
data string
}

func (l *TrapListener) OnTRAP(trap *snmpgo.TrapRequest) {
l.data = trap.Pdu.String()
}

func NewTrapListener(data *string) *TrapListener {
return &TrapListener{*data}
}

func TestStream_AlertSnmpTrap(t *testing.T) {
trapData := ""
server, err := snmpgo.NewTrapServer(snmpgo.ServerArguments{
LocalAddr: "127.0.0.1:9162",
})
if err != nil {
log.Fatal(err)
}
// V2c
err = server.AddSecurity(&snmpgo.SecurityEntry{
Version: snmpgo.V2c,
Community: "public",
})
if err != nil {
log.Fatal(err)
}
go server.Serve(NewTrapListener(&trapData))

var script = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
|window()
.period(10s)
.every(10s)
|count('value')
|alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.info(lambda: "count" > 6.0)
.warn(lambda: "count" > 7.0)
.crit(lambda: "count" > 8.0)
.snmptrap('1.1.1')
.data('1.1.1.2', 's', 'SNMP ALERT')
`

clock, et, replayErr, tm := testStreamer(t, "TestStream_Alert", script, nil)
defer tm.Close()

c := snmptrap.NewConfig()
c.Enabled = true
c.TargetIp = "127.0.0.1"
c.TargetPort = 9162
c.Community = "public"
c.Version = "2c"
st := snmptrap.NewService(c, logService.NewLogger("[test_snmptrap] ", log.LstdFlags))
tm.SnmpTrapService = st

err = fastForwardTask(clock, et, replayErr, tm, 13*time.Second)
if err != nil {
t.Error(err)
}

// Stop server
server.Close()

if trapData == "" {
t.Errorf("No SNMP Trap data received")
}
}
28 changes: 19 additions & 9 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ type AlertNode struct {

// Send alert using SNMPtraps.
// tick:ignore
SnmpTrapHandlers []*SnmpTrapHandler `tick:"SnmpTrap"`
SNMPTrapHandlers []*SNMPTrapHandler `tick:"SnmpTrap"`
}

func newAlertNode(wants EdgeType) *AlertNode {
Expand Down Expand Up @@ -1265,25 +1265,31 @@ type TalkHandler struct {
// Send alerts to `target-ip:target-port` on OID '1.3.6.1.2.1.1.7'
//
// tick:property
func (a *AlertNode) SnmpTrap(trapOid string, dataList ...[3]string) *SnmpTrapHandler {
snmpTrap := &SnmpTrapHandler{
func (a *AlertNode) SnmpTrap(trapOid string) *SNMPTrapHandler {
snmpTrap := &SNMPTrapHandler{
AlertNode: a,
TrapOid: trapOid,
DataList: dataList,
}
a.SnmpTrapHandlers = append(a.SnmpTrapHandlers, snmpTrap)
a.SNMPTrapHandlers = append(a.SNMPTrapHandlers, snmpTrap)
return snmpTrap
}

// SNMPTrap AlertHandler
// tick:embedded:AlertNode.SnmpTrap
type SnmpTrapHandler struct {
type SNMPTrapHandler struct {
*AlertNode

TrapOid string
// List of trap data.
// tick:ignore
DataList [][3]string `tick:"Data"`
DataList []SNMPData `tick:"Data"`
}

// tick:ignore
type SNMPData struct {
Oid string
Type string
Value string
}

// Define Data for SNMP Trap alert.
Expand All @@ -1306,9 +1312,13 @@ type SnmpTrapHandler struct {
// .data('1.3.6.1.4.1..1.7', 's', '{{ index .Fields "used_percent" }}' )
//
// tick:property
func (h *SnmpTrapHandler) Data(oid, rawType, value string) *SnmpTrapHandler {
func (h *SNMPTrapHandler) Data(oid, t, value string) *SNMPTrapHandler {
// TODO check element validity
data := [3]string{oid, rawType, value}
data := SNMPData{
Oid: oid,
Type: t,
Value: value,
}
h.DataList = append(h.DataList, data)
return h
}
Loading

0 comments on commit 95c591b

Please sign in to comment.