Skip to content

Commit

Permalink
Merge pull request #1612 from influxdata/md-issue#1547
Browse files Browse the repository at this point in the history
Fix panic with replaying a recording
  • Loading branch information
desa authored Oct 12, 2017
2 parents 69c8876 + 5a1e612 commit 6e45b39
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
- [#1470](https://github.com/influxdata/kapacitor/pull/1470): Fix error messages for missing fields which are arguments to functions are not clear
- [#1516](https://github.com/influxdata/kapacitor/pull/1516): Fix bad PagerDuty test the required server info.
- [#1581](https://github.com/influxdata/kapacitor/pull/1581): Add SNMP sysUpTime to SNMP Trap service
- [#1547](https://github.com/influxdata/kapacitor/issues/1547): Fix panic on recording replay with HTTPPostHandler.

## v1.3.3 [2017-08-11]

Expand Down
207 changes: 207 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3440,6 +3440,213 @@ test value=1 0000000012
}
}

func TestServer_RecordReplayStreamWithPost(t *testing.T) {
s, cli := OpenDefaultServer()
defer s.Close()

id := "testStreamTask"
ttype := client.StreamTask
dbrps := []client.DBRP{{
Database: "mydb",
RetentionPolicy: "myrp",
}}

tmpDir, err := ioutil.TempDir("", "testStreamTaskRecording")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
tick := `stream
|from()
.measurement('test')
|window()
.period(10s)
.every(10s)
|count('value')
|alert()
.id('test-count')
.message('{{ .ID }} got: {{ index .Fields "count" }}')
.crit(lambda: TRUE)
.post('http://localhost:8080')
.log('` + tmpDir + `/alert.log')
`

task, err := cli.CreateTask(client.CreateTaskOptions{
ID: id,
Type: ttype,
DBRPs: dbrps,
TICKscript: tick,
Status: client.Disabled,
})
if err != nil {
t.Fatal(err)
}
recording, err := cli.RecordStream(client.RecordStreamOptions{
ID: "recordingid",
Task: task.ID,
Stop: time.Date(1970, 1, 1, 0, 0, 10, 0, time.UTC),
})
if err != nil {
t.Fatal(err)
}
if exp, got := "/kapacitor/v1/recordings/recordingid", recording.Link.Href; exp != got {
t.Errorf("unexpected recording.Link.Href got %s exp %s", got, exp)
}

points := `test value=1 0000000000
test value=1 0000000001
test value=1 0000000001
test value=1 0000000002
test value=1 0000000002
test value=1 0000000003
test value=1 0000000003
test value=1 0000000004
test value=1 0000000005
test value=1 0000000005
test value=1 0000000005
test value=1 0000000006
test value=1 0000000007
test value=1 0000000008
test value=1 0000000009
test value=1 0000000010
test value=1 0000000011
test value=1 0000000012
`
v := url.Values{}
v.Add("precision", "s")
s.MustWrite("mydb", "myrp", points, v)

retry := 0
for recording.Status == client.Running {
time.Sleep(100 * time.Millisecond)
recording, err = cli.Recording(recording.Link)
if err != nil {
t.Fatal(err)
}
retry++
if retry > 10 {
t.Fatal("failed to finish recording")
}
}
if recording.Status != client.Finished || recording.Error != "" {
t.Errorf("recording failed: %s", recording.Error)
}

replay, err := cli.CreateReplay(client.CreateReplayOptions{
ID: "replayid",
Task: id,
Recording: recording.ID,
Clock: client.Fast,
RecordingTime: true,
})
if err != nil {
t.Fatal(err)
}
if exp, got := "/kapacitor/v1/replays/replayid", replay.Link.Href; exp != got {
t.Errorf("unexpected replay.Link.Href got %s exp %s", got, exp)
}
if exp, got := id, replay.Task; exp != got {
t.Errorf("unexpected replay.Task got %s exp %s", got, exp)
}

retry = 0
for replay.Status == client.Running {
time.Sleep(100 * time.Millisecond)
replay, err = cli.Replay(replay.Link)
if err != nil {
t.Fatal(err)
}
retry++
if retry > 10 {
t.Fatal("failed to finish replay")
}
}
if replay.Status != client.Finished || replay.Error != "" {
t.Errorf("replay failed: %s", replay.Error)
}

f, err := os.Open(filepath.Join(tmpDir, "alert.log"))
if err != nil {
t.Fatal(err)
}
defer f.Close()
type response struct {
ID string `json:"id"`
Message string `json:"message"`
Time time.Time `json:"time"`
Level string `json:"level"`
Data influxql.Result `json:"data"`
}
exp := response{
ID: "test-count",
Message: "test-count got: 15",
Time: time.Date(1970, 1, 1, 0, 0, 10, 0, time.UTC),
Level: "CRITICAL",
Data: influxql.Result{
Series: imodels.Rows{
{
Name: "test",
Columns: []string{"time", "count"},
Values: [][]interface{}{
{
time.Date(1970, 1, 1, 0, 0, 10, 0, time.UTC).Format(time.RFC3339Nano),
15.0,
},
},
},
},
},
}
got := response{}
d := json.NewDecoder(f)
d.Decode(&got)
if !reflect.DeepEqual(exp, got) {
t.Errorf("unexpected alert log:\ngot %v\nexp %v", got, exp)
}

recordings, err := cli.ListRecordings(nil)
if err != nil {
t.Error(err)
}
if exp, got := 1, len(recordings); exp != got {
t.Fatalf("unexpected recordings list:\ngot %v\nexp %v\nrecordings %v", got, exp, recordings)
}

err = cli.DeleteRecording(recordings[0].Link)
if err != nil {
t.Error(err)
}

recordings, err = cli.ListRecordings(nil)
if err != nil {
t.Error(err)
}
if exp, got := 0, len(recordings); exp != got {
t.Errorf("unexpected recordings list after delete:\ngot %v\nexp %v\nrecordings %v", got, exp, recordings)
}

replays, err := cli.ListReplays(nil)
if err != nil {
t.Error(err)
}
if exp, got := 1, len(replays); exp != got {
t.Fatalf("unexpected replays list:\ngot %v\nexp %v\nreplays %v", got, exp, replays)
}

err = cli.DeleteReplay(replays[0].Link)
if err != nil {
t.Error(err)
}

replays, err = cli.ListReplays(nil)
if err != nil {
t.Error(err)
}
if exp, got := 0, len(replays); exp != got {
t.Errorf("unexpected replays list after delete:\ngot %v\nexp %v\nreplays %v", got, exp, replays)
}
}

func TestServer_RecordReplayBatch(t *testing.T) {
c := NewConfig()
c.InfluxDB[0].Enabled = true
Expand Down
1 change: 1 addition & 0 deletions task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func (tm *TaskMaster) New(id string) *TaskMaster {
n.VictorOpsService = tm.VictorOpsService
n.PagerDutyService = tm.PagerDutyService
n.PushoverService = tm.PushoverService
n.HTTPPostService = tm.HTTPPostService
n.SlackService = tm.SlackService
n.TelegramService = tm.TelegramService
n.SNMPTrapService = tm.SNMPTrapService
Expand Down

0 comments on commit 6e45b39

Please sign in to comment.