Skip to content
This repository has been archived by the owner on Feb 13, 2025. It is now read-only.

Commit

Permalink
cmd/bosun: Add over, shift, and merge funcs
Browse files Browse the repository at this point in the history
This adds the ability to graph multiple seriesSets in expression graphs, and also graph things like week-over-week. Fixes #985
  • Loading branch information
kylebrandt committed Feb 16, 2016
1 parent 8b7df5a commit 3bdc6da
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 4 deletions.
157 changes: 153 additions & 4 deletions cmd/bosun/expr/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ var TSDB = map[string]parse.Func{
Tags: tagQuery,
F: Band,
},
"shiftBand": {
Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString, models.TypeScalar},
Return: models.TypeSeriesSet,
Tags: tagQuery,
F: ShiftBand,
},
"over": {
Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString, models.TypeScalar},
Return: models.TypeSeriesSet,
Tags: tagQuery,
F: Over,
},
"change": {
Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString},
Return: models.TypeNumberSet,
Expand Down Expand Up @@ -319,6 +331,19 @@ var builtins = map[string]parse.Func{
Tags: tagFirst,
F: Sort,
},
"shift": {
Args: []models.FuncType{models.TypeSeriesSet, models.TypeString},
Return: models.TypeSeriesSet,
Tags: tagFirst,
F: Shift,
},
"merge": {
Args: []models.FuncType{models.TypeSeriesSet},
VArgs: true,
Return: models.TypeSeriesSet,
Tags: tagFirst,
F: Merge,
},
}

func Epoch(e *State, T miniprofiler.Timer) (*Results, error) {
Expand Down Expand Up @@ -371,6 +396,43 @@ func Filter(e *State, T miniprofiler.Timer, series *Results, number *Results) (*
return series, nil
}

func Merge(e *State, T miniprofiler.Timer, series ...*Results) (*Results, error) {
if len(series) == 0 {
return &Results{}, fmt.Errorf("merge requires at least one result")
}
if len(series) == 1 {
return series[0], nil
}
res := series[0]
seen := make(map[string]bool)
for _, r := range series[1:] {
for _, entry := range r.Results {
if _, ok := seen[entry.Group.String()]; ok {
return res, fmt.Errorf("duplicate group in merge: %s", entry.Group.String())
}
seen[entry.Group.String()] = true
}
res.Results = append(res.Results, r.Results...)
}
return res, nil
}

func Shift(e *State, T miniprofiler.Timer, series *Results, d string) (*Results, error) {
dur, err := opentsdb.ParseDuration(d)
if err != nil {
return series, err
}
for _, result := range series.Results {
newSeries := make(Series)
for t, v := range result.Value.Value().(Series) {
newSeries[t.Add(time.Duration(dur))] = v
}
result.Group["shift"] = d
result.Value = newSeries
}
return series, nil
}

func Duration(e *State, T miniprofiler.Timer, d string) (*Results, error) {
duration, err := opentsdb.ParseDuration(d)
if err != nil {
Expand Down Expand Up @@ -560,7 +622,7 @@ func GraphiteBand(e *State, T miniprofiler.Timer, query, duration, period, forma
return
}

func bandTSDB(e *State, T miniprofiler.Timer, query, duration, period string, num float64, rfunc func(*Results, *opentsdb.Response) error) (r *Results, err error) {
func bandTSDB(e *State, T miniprofiler.Timer, query, duration, period string, num float64, rfunc func(*Results, *opentsdb.Response, time.Duration) error) (r *Results, err error) {
r = new(Results)
r.IgnoreOtherUnjoined = true
r.IgnoreUnjoined = true
Expand Down Expand Up @@ -609,7 +671,9 @@ func bandTSDB(e *State, T miniprofiler.Timer, query, duration, period string, nu
if e.squelched(res.Tags) {
continue
}
if err = rfunc(r, res); err != nil {
//offset := e.now.Sub(now.Add(time.Duration(p-d)))
offset := e.now.Sub(now)
if err = rfunc(r, res, offset); err != nil {
return
}
}
Expand All @@ -624,7 +688,7 @@ func Window(e *State, T miniprofiler.Timer, query, duration, period string, num
return nil, fmt.Errorf("expr: Window: no %v function", rfunc)
}
windowFn := reflect.ValueOf(fn.F)
bandFn := func(results *Results, resp *opentsdb.Response) error {
bandFn := func(results *Results, resp *opentsdb.Response, offset time.Duration) error {
values := make(Series)
min := int64(math.MaxInt64)
for k, v := range resp.DPS {
Expand Down Expand Up @@ -695,7 +759,7 @@ func windowCheck(t *parse.Tree, f *parse.FuncNode) error {
}

func Band(e *State, T miniprofiler.Timer, query, duration, period string, num float64) (r *Results, err error) {
r, err = bandTSDB(e, T, query, duration, period, num, func(r *Results, res *opentsdb.Response) error {
r, err = bandTSDB(e, T, query, duration, period, num, func(r *Results, res *opentsdb.Response, offset time.Duration) error {
newarr := true
for _, a := range r.Results {
if !a.Group.Equal(res.Tags) {
Expand Down Expand Up @@ -732,6 +796,91 @@ func Band(e *State, T miniprofiler.Timer, query, duration, period string, num fl
return
}

func ShiftBand(e *State, T miniprofiler.Timer, query, duration, period string, num float64) (r *Results, err error) {
r, err = bandTSDB(e, T, query, duration, period, num, func(r *Results, res *opentsdb.Response, offset time.Duration) error {
values := make(Series)
a := &Result{Group: res.Tags.Merge(opentsdb.TagSet{"shift": offset.String()})}
for k, v := range res.DPS {
i, e := strconv.ParseInt(k, 10, 64)
if e != nil {
return e
}
values[time.Unix(i, 0).Add(offset).UTC()] = float64(v)
}
a.Value = values
r.Results = append(r.Results, a)
return nil
})
if err != nil {
err = fmt.Errorf("expr: Band: %v", err)
}
return
}

func Over(e *State, T miniprofiler.Timer, query, duration, period string, num float64) (r *Results, err error) {
r = new(Results)
r.IgnoreOtherUnjoined = true
r.IgnoreUnjoined = true
T.Step("band", func(T miniprofiler.Timer) {
var d, p opentsdb.Duration
d, err = opentsdb.ParseDuration(duration)
if err != nil {
return
}
p, err = opentsdb.ParseDuration(period)
if err != nil {
return
}
if num < 1 || num > 100 {
err = fmt.Errorf("num out of bounds")
}
var q *opentsdb.Query
q, err = opentsdb.ParseQuery(query, e.tsdbContext.Version())
if err != nil {
return
}
if !e.tsdbContext.Version().FilterSupport() {
if err = e.Search.Expand(q); err != nil {
return
}
}
req := opentsdb.Request{
Queries: []*opentsdb.Query{q},
}
now := e.now
req.End = now.Unix()
req.Start = now.Add(time.Duration(-d)).Unix()
for i := 0; i < int(num); i++ {
var s opentsdb.ResponseSet
s, err = timeTSDBRequest(e, T, &req)
if err != nil {
return
}
offset := e.now.Sub(now)
for _, res := range s {
if e.squelched(res.Tags) {
continue
}
values := make(Series)
a := &Result{Group: res.Tags.Merge(opentsdb.TagSet{"shift": offset.String()})}
for k, v := range res.DPS {
i, err := strconv.ParseInt(k, 10, 64)
if err != nil {
return
}
values[time.Unix(i, 0).Add(offset).UTC()] = float64(v)
}
a.Value = values
r.Results = append(r.Results, a)
}
now = now.Add(time.Duration(-p))
req.End = now.Unix()
req.Start = now.Add(time.Duration(-d)).Unix()
}
})
return
}

func GraphiteQuery(e *State, T miniprofiler.Timer, query string, sduration, eduration, format string) (r *Results, err error) {
sd, err := opentsdb.ParseDuration(sduration)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions docs/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ Generic query from endDuration to startDuration ago. If endDuration is the empty

Band performs `num` queries of `duration` each, `period` apart and concatenates them together, starting `period` ago. So `band("avg:os.cpu", "1h", "1d", 7)` will return a series comprising of the given metric from 1d to 1d-1h-ago, 2d to 2d-1h-ago, etc, until 8d. This is a good way to get a time block from a certain hour of a day or certain day of a week over a long time period.

### over(query string, duration string, period string, num scalar) seriesSet
Over's arguments behave the same way as band. However over shifts the time of previous periods to be now, tags them with duration that each period was shifted, and merges those shifted periods into a single seriesSet. This is useful for displaying time over time graphs. For example, the same day week over week would be `over("avg:1h-avg:rate:os.cpu{host=ny-bosun01}", "1d", "1w", 4)`.

### change(query string, startDuration string, endDuration string) numberSet

Change is a way to determine the change of a query from startDuration to endDuration. If endDuration is the empty string (`""`), now is used. The query must either be a rate or a counter converted to a rate with the `agg:rate:metric` flag.
Expand Down Expand Up @@ -479,6 +482,14 @@ Returns the first count (scalar) results of number.

Returns the first key from the given lookup table with matching tags.

##shift(seriesSet, dur string) seriesSet

Shift takes a seriesSet and shifts the time forward by the value of dur ([OpenTSDB duration string](http://opentsdb.net/docs/build/html/user_guide/query/dates.html)) and adds a tag for representing the shift duration. This is meant so you can overlay times visually in a graph.

## merge(SeriesSet...) seriesSet

Merge takes multiple seriesSets and merges them into a single seriesSet. The function will error if any of the tag sets (groups) are identical. This is meant so you can display multiple seriesSets in a single expression graph.

## nv(numberSet, scalar) numberSet

Change the NaN value during binary operations (when joining two queries) of unknown groups to the scalar. This is useful to prevent unknown group and other errors from bubbling up.
Expand Down

0 comments on commit 3bdc6da

Please sign in to comment.