From 0c77c133a281b898e1ed58cf5742d317f287e163 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Mon, 25 Jun 2018 14:27:52 -0600 Subject: [PATCH] Fix KafkaTopic not working from TICKScript --- CHANGELOG.md | 4 +++ alert.go | 2 +- integrations/streamer_test.go | 66 +++++++++++++++++++++++++++++++++++ server/server_test.go | 2 +- 4 files changed, 72 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 706160c87..da3e2e8a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## v1.5.1 [unreleased] + +- [#1982](https://github.com/influxdata/kapacitor/pull/1982): Fix KafkaTopic not working from TICKscript + ## v1.5.0 [2018-05-17] ### Features diff --git a/alert.go b/alert.go index 2ea9f9b77..294aaf503 100644 --- a/alert.go +++ b/alert.go @@ -319,7 +319,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a for _, k := range n.KafkaHandlers { c := kafka.HandlerConfig{ Cluster: k.Cluster, - Topic: k.Topic, + Topic: k.KafkaTopic, Template: k.Template, } h, err := et.tm.KafkaService.Handler(c, ctx...) diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index b6771be70..c382bdbf2 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -46,6 +46,8 @@ import ( "github.com/influxdata/kapacitor/services/httppost/httpposttest" k8s "github.com/influxdata/kapacitor/services/k8s/client" "github.com/influxdata/kapacitor/services/k8s/k8stest" + "github.com/influxdata/kapacitor/services/kafka" + "github.com/influxdata/kapacitor/services/kafka/kafkatest" "github.com/influxdata/kapacitor/services/opsgenie" "github.com/influxdata/kapacitor/services/opsgenie/opsgenietest" "github.com/influxdata/kapacitor/services/opsgenie2" @@ -8543,6 +8545,70 @@ stream } } +func TestStream_AlertKafka(t *testing.T) { + ts, err := kafkatest.NewServer() + if err != nil { + t.Fatal(err) + } + defer ts.Close() + + var script = ` +stream + |from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + .groupBy('host') + |window() + .period(10s) + .every(10s) + |count('value') + |alert() + .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') + .info(lambda: "count" > 6.0) + .warn(lambda: "count" > 7.0) + .crit(lambda: "count" > 8.0) + .kafka() + .cluster('default') + .kafkaTopic('testTopic') + .template('{{.Message}}') +` + + tmInit := func(tm *kapacitor.TaskMaster) { + configs := kafka.Configs{{ + Enabled: true, + ID: "default", + Brokers: []string{ts.Addr.String()}, + }} + d := diagService.NewKafkaHandler().WithContext(keyvalue.KV("test", "kafka")) + tm.KafkaService = kafka.NewService(configs, d) + } + testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, tmInit) + + exp := []interface{}{ + kafkatest.Message{ + Topic: "testTopic", + Partition: 1, + Offset: 0, + Key: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is CRITICAL", + }, + } + + ts.Close() + msgs, err := ts.Messages() + if err != nil { + t.Fatal(err) + } + got := make([]interface{}, len(msgs)) + for i, m := range msgs { + got[i] = m + } + + if err := compareListIgnoreOrder(got, exp, nil); err != nil { + t.Error(err) + } +} + func TestStream_AlertTelegram(t *testing.T) { ts := telegramtest.NewServer() defer ts.Close() diff --git a/server/server_test.go b/server/server_test.go index febdc98e3..712150ab2 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -9624,7 +9624,7 @@ func TestServer_AlertHandlers(t *testing.T) { Message: string(adJSON) + "\n", }} if !cmp.Equal(exp, got) { - return fmt.Errorf("unexpected kafak messages -exp/+got:\n%s", cmp.Diff(exp, got)) + return fmt.Errorf("unexpected kafka messages -exp/+got:\n%s", cmp.Diff(exp, got)) } return nil },