Skip to content

Commit

Permalink
feat(alerts): Allow for compact json in templates and BP (#2608)
Browse files Browse the repository at this point in the history
* fix(bigpanda): compact bigpanda and template alert

* feat(alert): jsonCompact templating function

* feat(alert): add a jsonCompact to more template types and tests
  • Loading branch information
docmerlin authored Sep 23, 2021
1 parent 5e8a2da commit b4091b8
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 43 deletions.
29 changes: 25 additions & 4 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type AlertNode struct {

// Create a new AlertNode which caches the most recent item and exposes it over the HTTP API.
func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (an *AlertNode, err error) {
const oneMeg = 2 << 19
ctx := []keyvalue.T{
keyvalue.KV("task", et.Task.ID),
}
Expand Down Expand Up @@ -119,16 +120,36 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
}

an.detailsTmpl, err = html.New("details").Funcs(html.FuncMap{
"json": func(v interface{}) html.JS {

"jsonCompact": func(v interface{}) html.JS {
tmpBuffer := an.bufPool.Get().(*bytes.Buffer)
tmpBuffer2 := an.bufPool.Get().(*bytes.Buffer)

defer func() {
tmpBuffer.Reset()
an.bufPool.Put(tmpBuffer)
if tmpBuffer.Cap() < oneMeg { // only reuse the buffer if it is less than 500kb
tmpBuffer.Reset()
an.bufPool.Put(tmpBuffer)
}
if tmpBuffer2.Cap() < oneMeg { // only reuse the buffer if it is less than 500kb
tmpBuffer2.Reset()
an.bufPool.Put(tmpBuffer2)
}
}()

_ = json.NewEncoder(tmpBuffer).Encode(v)
_ = json.Compact(tmpBuffer2, tmpBuffer.Bytes())
return html.JS(tmpBuffer2.String())
},
"json": func(v interface{}) html.JS {
tmpBuffer := an.bufPool.Get().(*bytes.Buffer)

defer func() {
if tmpBuffer.Cap() < oneMeg { // only reuse the buffer if it is less than 500kb
tmpBuffer.Reset()
an.bufPool.Put(tmpBuffer)
}
}()

_ = json.NewEncoder(tmpBuffer).Encode(v)
return html.JS(tmpBuffer.String())
},
}).Parse(n.Details)
Expand Down
233 changes: 233 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10197,6 +10197,239 @@ stream
}
}

func TestServer_AlertJSON(t *testing.T) {
postServer := func(t *testing.T, expected string) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data, err := io.ReadAll(r.Body)
if err != nil {
t.Error(err)
}
if (string(data)) != expected {
t.Error(
"Unexpected httppost request",
cmp.Diff(string(data), expected))
}
}))
}

testCases := []struct {
name string
setup func(*testing.T, *server.Config, string) (*httptest.Server, map[string]interface{}, error)
expected string
tick string
}{
{
name: "normal_json",
setup: func(t *testing.T, c *server.Config, expected string) (*httptest.Server, map[string]interface{}, error) {
ts := postServer(t, expected)
ha := &client.TopicHandler{
Kind: "post",
}
ha.Options = map[string]interface{}{
"url": ts.URL,
"endpoint": "test",
}
c.HTTPPost = httppost.Configs{{
Endpoint: "test",
URLTemplate: ts.URL,
Headers: nil,
AlertTemplate: "{{ json . }}",
}}
return ts, ha.Options, nil
},
expected: `{"id":"id","message":"message","details":"details","time":"1970-01-01T00:00:00Z",` +
`"duration":0,"level":"CRITICAL","data":{"series":[{"name":"alert","columns":["time","value"],` +
`"values":[["1970-01-01T00:00:00Z",1]]}]},"previousLevel":"OK","recoverable":true}` +
"\n",
tick: `stream
|from()
.measurement('alert')
|alert()
.topic('test')
.id('id')
.message('message')
.details('details')
.crit(lambda: TRUE)`,
},
{
name: "compact-json",
setup: func(t *testing.T, c *server.Config, expected string) (*httptest.Server, map[string]interface{}, error) {
ts := postServer(t, expected)
ha := &client.TopicHandler{
Kind: "post",
}
ha.Options = map[string]interface{}{
"url": ts.URL,
"endpoint": "test",
}
c.HTTPPost = httppost.Configs{{
Endpoint: "test",
URLTemplate: ts.URL,
Headers: nil,
AlertTemplate: "{{ jsonCompact . }}", // note this gets ignored by the testing framework
}}
return ts, ha.Options, nil
},
expected: `{"id":"id","message":"message","details":"details","time":"1970-01-01T00:00:00Z","duration":0,"level":"CRITICAL","data":{"series":[{"name":"alert","columns":["time","value"],"values":[["1970-01-01T00:00:00Z",1]]}]},"previousLevel":"OK","recoverable":true}`,
tick: `stream
|from()
.measurement('alert')
|alert()
.topic('test')
.id('id')
.message('message')
.details('details')
.crit(lambda: TRUE)`,
},
{
name: "details_normal_json",
setup: func(t *testing.T, c *server.Config, expected string) (*httptest.Server, map[string]interface{}, error) {
ts := postServer(t, expected)
ha := &client.TopicHandler{
Kind: "post",
}
ha.Options = map[string]interface{}{
"url": ts.URL,
"endpoint": "test",
}
c.HTTPPost = httppost.Configs{{
Endpoint: "test",
URLTemplate: ts.URL,
Headers: nil,
AlertTemplate: "{{ json . }}",
}}
return ts, ha.Options, nil
},
expected: `{"id":"id","message":"message","details":` +
`"{\u0026#34;Name\u0026#34;:\u0026#34;alert\u0026#34;,\u0026#34;Ta` +
`skName\u0026#34;:\u0026#34;testAlertHandlers\u0026#34;,\u0026#34` +
`;Group\u0026#34;:\u0026#34;nil\u0026#34;,\u0026#34;Tags\u0026#34` +
`;:{},\u0026#34;ServerInfo\u0026#34;:{\u0026#34;Hostname\u0026#34` +
`;:\u0026#34;localhost\u0026#34;,\u0026#34;ClusterID\u0026#34;:\u` +
`0026#34;4c77777-a43d-439a-ba56-b08cbb0993fe\u0026#34;,\u0026#34` +
`;ServerID\u0026#34;:\u0026#34;af80a060-3226-41e2-b1cd-548a1467b491` +
`91\u0026#34;},\u0026#34;ID\u0026#34;:\u0026#34;id\u0026#34;,\u00` +
`26#34;Fields\u0026#34;:{\u0026#34;value\u0026#34;:1},\u0026#34;L` +
`evel\u0026#34;:\u0026#34;CRITICAL\u0026#34;,\u0026#34;Time\u0026` +
`#34;:\u0026#34;1970-01-01T00:00:00Z\u0026#34;,\u0026#34;Duration` +
`\u0026#34;:0,\u0026#34;Message\u0026#34;:\u0026#34;message\u0026` +
`#34;}\n",` +
`"time":"1970-01-01T00:00:00Z","duration":0,"level":"CRITICAL",` +
`"data":{"series":[{"name":"alert","columns":["time","value"],` +
`"values":[["1970-01-01T00:00:00Z",1]]}]},"previousLevel":"OK","recoverable":true}`,
tick: `stream
|from()
.measurement('alert')
|alert()
.topic('test')
.id('id')
.message('message')
.details('{{ json .data }}')
.crit(lambda: TRUE)`,
},
{
name: "details_compact_json",
setup: func(t *testing.T, c *server.Config, expected string) (*httptest.Server, map[string]interface{}, error) {
ts := postServer(t, expected)
ha := &client.TopicHandler{
Kind: "post",
}
ha.Options = map[string]interface{}{
"url": ts.URL,
"endpoint": "test",
}
c.HTTPPost = httppost.Configs{{
Endpoint: "test",
URLTemplate: ts.URL,
Headers: nil,
AlertTemplate: "{{ json . }}",
}}
return ts, ha.Options, nil
},
expected: `{"id":"id","message":"message","details":` +
`"{\u0026#34;Name\u0026#34;:\u0026#34;alert\u0026#34;,\u0026#34;Ta` +
`skName\u0026#34;:\u0026#34;testAlertHandlers\u0026#34;,\u0026#34` +
`;Group\u0026#34;:\u0026#34;nil\u0026#34;,\u0026#34;Tags\u0026#34` +
`;:{},\u0026#34;ServerInfo\u0026#34;:{\u0026#34;Hostname\u0026#34` +
`;:\u0026#34;localhost\u0026#34;,\u0026#34;ClusterID\u0026#34;:\u` +
`0026#34;f4c77777-a43d-439a-ba56-b08cbb0993fe\u0026#34;,\u0026#34` +
`;ServerID\u0026#34;:\u0026#34;af80a060-3226-41e2-b1cd-548a1467b4` +
`91\u0026#34;},\u0026#34;ID\u0026#34;:\u0026#34;id\u0026#34;,\u00` +
`26#34;Fields\u0026#34;:{\u0026#34;value\u0026#34;:1},\u0026#34;L` +
`evel\u0026#34;:\u0026#34;CRITICAL\u0026#34;,\u0026#34;Time\u0026` +
`#34;:\u0026#34;1970-01-01T00:00:00Z\u0026#34;,\u0026#34;Duration` +
`\u0026#34;:0,\u0026#34;Message\u0026#34;:\u0026#34;message\u0026` +
`#34;},` +
`"time":"1970-01-01T00:00:00Z","duration":0,"level":"CRITICAL",` +
`"data":{"series":[{"name":"alert","columns":["time","value"],` +
`"values":[["1970-01-01T00:00:00Z",1]]}]},"previousLevel":"OK","recoverable":true}` +
"\n",
tick: `stream
|from()
.measurement('alert')
|alert()
.topic('test')
.id('id')
.message('message')
.details('{{ jsonCompact .data }}')
.crit(lambda: TRUE)`,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

// Create default config
c := NewConfig()
var options map[string]interface{}
ctxt := context.Background()
var ts *httptest.Server
var err error
ts, options, err = tc.setup(t, c, tc.expected)
if err != nil {
t.Fatal(err)
}
defer ts.Close()
s := OpenServer(c)
cli := Client(s)
defer func() {
s.Close()
}()
ctxt = context.WithValue(ctxt, "kapacitorURL", s.URL())

_, err = cli.CreateTopicHandler(cli.TopicHandlersLink("test"), client.TopicHandlerOptions{
ID: "testJSONHandlers",
Kind: "post",
Options: options,
})
if err != nil {
t.Fatal(err)
}

if _, err := cli.CreateTask(client.CreateTaskOptions{
ID: "testAlertHandlers",
Type: client.StreamTask,
DBRPs: []client.DBRP{{
Database: "mydb",
RetentionPolicy: "myrp",
}},
TICKscript: tc.tick,
Status: client.Enabled,
}); err != nil {
t.Fatalf("%s: %v", tc.name, err)
}

point := "alert value=1 0000000000"
v := url.Values{}
v.Add("precision", "s")
s.MustWrite("mydb", "myrp", point, v)

})
}

}

func TestServer_AlertHandlers(t *testing.T) {

resultJSON := `{"series":[{"name":"alert","columns":["time","value"],"values":[["1970-01-01T00:00:00Z",1]]}]}`
Expand Down
3 changes: 1 addition & 2 deletions services/alert/dao_easyjson.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions services/bigpanda/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,15 @@ func (s *Service) preparePost(id string, message string, details string, level a
}
}

var post bytes.Buffer
enc := json.NewEncoder(&post)
var postTemp bytes.Buffer
enc := json.NewEncoder(&postTemp)
if err := enc.Encode(bpData); err != nil {
return nil, err
}
var post bytes.Buffer
if err := json.Compact(&post, postTemp.Bytes()); err != nil {
return nil, err
}

bpUrl := hc.URL
if bpUrl == "" {
Expand Down
Loading

0 comments on commit b4091b8

Please sign in to comment.