Skip to content

Commit

Permalink
fix record query as stream with group by to save data in time order
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Apr 13, 2016
1 parent dc2967a commit e6a7121
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Example UDF config for a socket based UDF.
- [#429](https://github.com/influxdata/kapacitor/issues/429): BREAKING: Change TICKscript parser to be left-associative on equal precedence operators. For example previously this statement `(1+2-3*4/5)` was evaluated as `(1+(2-(3*(4/5))))`
which is not the typical/expected behavior. Now using left-associative parsing the statement is evaluated as `((1+2)-((3*4)/5))`.
- [#456](https://github.com/influxdata/kapacitor/pull/456): Fixes Alerta integration to let server set status, fix `rawData` attribute and set default severity to `indeterminate`.
- [#423](https://github.com/influxdata/kapacitor/issues/423): Recording stream queries with group by now correctly saves data in time order not group by order.

## v0.12.0 [2016-04-04]

Expand Down
5 changes: 4 additions & 1 deletion cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,10 +392,13 @@ func doRecord(args []string) error {
default:
return fmt.Errorf("Unknown record type %q, expected 'stream', 'batch' or 'query'", args[0])
}
_, err = cli.Recording(rid)
info, err := cli.Recording(rid)
if err != nil {
return err
}
if info.Error != "" {
return errors.New(info.Error)
}
fmt.Println(rid)
return nil
}
Expand Down
62 changes: 49 additions & 13 deletions services/replay/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,21 +807,57 @@ func (r *Service) doRecordQuery(rid uuid.UUID, q string, tt kapacitor.TaskType,
c.Close()
return err
}
for _, batch := range batches {
switch tt {
case kapacitor.StreamTask:
for _, bp := range batch.Points {
p := models.Point{
Name: batch.Name,
Database: db,
RetentionPolicy: rp,
Tags: bp.Tags,
Fields: bp.Fields,
Time: bp.Time,
switch tt {
case kapacitor.StreamTask:
// Write points in order across batches

// Find earliest time of first points
current := time.Time{}
for _, batch := range batches {
if len(batch.Points) > 0 &&
(current.IsZero() ||
batch.Points[0].Time.Before(current)) {
current = batch.Points[0].Time
}
}

finishedCount := 0
batchCount := len(batches)
for finishedCount != batchCount {
next := time.Time{}
for b := range batches {
l := len(batches[b].Points)
if l == 0 {
finishedCount++
continue
}
kapacitor.WritePointForRecording(w, p, precision)
i := 0
for ; i < l; i++ {
bp := batches[b].Points[i]
if bp.Time.After(current) {
if next.IsZero() || bp.Time.Before(next) {
next = bp.Time
}
break
}
// Write point
p := models.Point{
Name: batches[b].Name,
Database: db,
RetentionPolicy: rp,
Tags: bp.Tags,
Fields: bp.Fields,
Time: bp.Time,
}
kapacitor.WritePointForRecording(w, p, precision)
}
// Remove written points
batches[b].Points = batches[b].Points[i:]
}
case kapacitor.BatchTask:
current = next
}
case kapacitor.BatchTask:
for _, batch := range batches {
kapacitor.WriteBatchForRecording(w, batch)
}
}
Expand Down

0 comments on commit e6a7121

Please sign in to comment.