Skip to content

Commit

Permalink
Merge pull request #2055 from nermolaev/master
Browse files Browse the repository at this point in the history
Alerta support for correlate with templating
  • Loading branch information
docmerlin authored Dec 1, 2020
2 parents 0ea2776 + 9773eea commit 8a8d4b5
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

### Features
- [#2055](https://github.com/influxdata/kapacitor/pull/2055): Add support for correlate in the Alerta AlertNode, thanks @nermolaev!
- [#2409](https://github.com/influxdata/kapacitor/pull/2409): Optionally use kapacitor alert details as opsgenie description text, thanks @JamesClonk!

## v1.5.7 [2020-10-27]
Expand Down
3 changes: 3 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
if len(a.Service) != 0 {
c.Service = a.Service
}
if len(a.Correlate) != 0 {
c.Correlate = a.Correlate
}
if a.Timeout != 0 {
c.Timeout = a.Timeout
}
Expand Down
3 changes: 3 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9089,6 +9089,7 @@ stream
.group('{{ .ID }}')
.value('{{ index .Fields "count" }}')
.services('serviceA', 'serviceB', '{{ .Name }}')
.correlated('{{ .Name }}')
`
tmInit := func(tm *kapacitor.TaskMaster) {
c := alerta.NewConfig()
Expand All @@ -9112,6 +9113,7 @@ stream
Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
Origin: "Kapacitor",
Service: []string{"cpu"},
Correlate: []string{"cpu"},
Timeout: 3600,
},
},
Expand All @@ -9126,6 +9128,7 @@ stream
Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
Origin: "override",
Service: []string{"serviceA", "serviceB", "cpu"},
Correlate: []string{"cpu"},
Value: "10",
Timeout: 86400,
},
Expand Down
8 changes: 8 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,9 @@ type AlertaHandler struct {
// tick:ignore
Service []string `tick:"Services" json:"service"`

// List of Correlated
Correlate []string `tick:"Correlated" json:"correlate"`

// Alerta timeout.
// Default: 24h
Timeout time.Duration `json:"timeout"`
Expand All @@ -1269,6 +1272,11 @@ func (a *AlertaHandler) Services(service ...string) *AlertaHandler {
return a
}

func (a *AlertaHandler) Correlated(correlate ...string) *AlertaHandler {
a.Correlate = correlate
return a
}

// Send alert to an MQTT broker
// tick:property
func (n *AlertNodeData) Mqtt(topic string) *MQTTHandler {
Expand Down
1 change: 1 addition & 0 deletions pipeline/tick/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (n *AlertNode) Build(a *pipeline.AlertNode) (ast.Node, error) {
Dot("value", h.Value).
Dot("origin", h.Origin).
Dot("services", args(h.Service)...).
Dot("correlated", args(h.Correlate)...).
Dot("timeout", h.Timeout)
}

Expand Down
2 changes: 2 additions & 0 deletions pipeline/tick/alert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ func TestAlertAlerta(t *testing.T) {
handler.Value = "Save the Galaxy"
handler.Origin = "Omega"
handler.Services("legion", "vent", "garrus", "distraction team", "grunt", "crew", "samara", "barrier")
handler.Correlated("Harbinger")
handler.Timeout = 10 * time.Second

want := `stream
Expand All @@ -589,6 +590,7 @@ func TestAlertAlerta(t *testing.T) {
.value('Save the Galaxy')
.origin('Omega')
.services('legion', 'vent', 'garrus', 'distraction team', 'grunt', 'crew', 'samara', 'barrier')
.correlated('Harbinger')
.timeout(10s)
`
PipelineTickTestHelper(t, pipe, want)
Expand Down
5 changes: 5 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8844,6 +8844,10 @@ func TestServer_ListServiceTests(t *testing.T) {
"testServiceA",
"testServiceB",
},
"correlate": []interface{}{
"testServiceX",
"testServiceY",
},
"timeout": "24h0m0s",
},
},
Expand Down Expand Up @@ -9812,6 +9816,7 @@ func TestServer_AlertHandlers(t *testing.T) {
Text: "message",
Origin: "kapacitor",
Service: []string{"alert"},
Correlate: []string{"alert"},
Timeout: 86400,
},
}}
Expand Down
1 change: 1 addition & 0 deletions services/alerta/alertatest/alertatest.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type PostData struct {
Text string `json:"text"`
Origin string `json:"origin"`
Service []string `json:"service"`
Correlate []string `json:"correlate"`
Value string `json:"value"`
Timeout int64 `json:"timeout"`
}
43 changes: 40 additions & 3 deletions services/alerta/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type testOptions struct {
Message string `json:"message"`
Origin string `json:"origin"`
Service []string `json:"service"`
Correlate []string `json:"correlate"`
Timeout string `json:"timeout"`
}

Expand All @@ -76,6 +77,7 @@ func (s *Service) TestOptions() interface{} {
Message: "test alerta message",
Origin: c.Origin,
Service: []string{"testServiceA", "testServiceB"},
Correlate: []string{"testServiceX", "testServiceY"},
Timeout: "24h0m0s",
}
}
Expand All @@ -99,6 +101,7 @@ func (s *Service) Test(options interface{}) error {
o.Message,
o.Origin,
o.Service,
o.Correlate,
timeout,
map[string]string{},
models.Result{},
Expand Down Expand Up @@ -133,12 +136,12 @@ func (s *Service) Update(newConfig []interface{}) error {
return nil
}

func (s *Service) Alert(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, timeout time.Duration, tags map[string]string, data models.Result) error {
func (s *Service) Alert(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, correlate []string, timeout time.Duration, tags map[string]string, data models.Result) error {
if resource == "" || event == "" {
return errors.New("Resource and Event are required to send an alert")
}

req, err := s.preparePost(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin, service, timeout, tags, data)
req, err := s.preparePost(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin, service, correlate, timeout, tags, data)
if err != nil {
return err
}
Expand Down Expand Up @@ -166,7 +169,7 @@ func (s *Service) Alert(token, tokenPrefix, resource, event, environment, severi
return nil
}

func (s *Service) preparePost(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, timeout time.Duration, tags map[string]string, data models.Result) (*http.Request, error) {
func (s *Service) preparePost(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, correlate []string, timeout time.Duration, tags map[string]string, data models.Result) (*http.Request, error) {
c := s.config()

if !c.Enabled {
Expand Down Expand Up @@ -212,6 +215,9 @@ func (s *Service) preparePost(token, tokenPrefix, resource, event, environment,
if len(service) > 0 {
postData["service"] = service
}
if len(correlate) > 0 {
postData["correlate"] = correlate
}
postData["timeout"] = int64(timeout / time.Second)

tagList := make([]string, 0)
Expand Down Expand Up @@ -278,6 +284,9 @@ type HandlerConfig struct {
// List of effected Services
Service []string `mapstructure:"service"`

// List of correlated events
Correlate []string `mapstructure:"correlate"`

// Alerta timeout.
// Default: 24h
Timeout time.Duration `mapstructure:"timeout"`
Expand All @@ -294,6 +303,7 @@ type handler struct {
valueTmpl *text.Template
groupTmpl *text.Template
serviceTmpl []*text.Template
correlateTmpl []*text.Template
}

func (s *Service) DefaultHandlerConfig() HandlerConfig {
Expand Down Expand Up @@ -337,6 +347,15 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, er
stmpl = append(stmpl, tmpl)
}

var ctmpl []*text.Template
for _, correlate := range c.Correlate {
tmpl, err := text.New("correlate").Parse(correlate)
if err != nil {
return nil, err
}
ctmpl = append(ctmpl, tmpl)
}

return &handler{
s: s,
c: c,
Expand All @@ -347,6 +366,7 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, er
groupTmpl: gtmpl,
valueTmpl: vtmpl,
serviceTmpl: stmpl,
correlateTmpl: ctmpl,
}, nil
}

Expand Down Expand Up @@ -430,6 +450,22 @@ func (h *handler) Handle(event alert.Event) {
buf.Reset()
}
}
buf.Reset()

var correlate []string
if len(h.correlateTmpl) == 0 {
correlate = []string{td.Name}
} else {
for _, tmpl := range h.correlateTmpl {
err = tmpl.Execute(&buf, td)
if err != nil {
h.diag.TemplateError(err, keyvalue.KV("correlate", tmpl.Name()))
return
}
correlate = append(correlate, buf.String())
buf.Reset()
}
}

var severity string

Expand Down Expand Up @@ -458,6 +494,7 @@ func (h *handler) Handle(event alert.Event) {
event.State.Message,
h.c.Origin,
service,
correlate,
h.c.Timeout,
event.Data.Tags,
event.Data.Result,
Expand Down

0 comments on commit 8a8d4b5

Please sign in to comment.