Skip to content

Commit

Permalink
Add Error method for step evaluators (#2223)
Browse files Browse the repository at this point in the history
  • Loading branch information
adityacs authored Jun 18, 2020
1 parent 815c475 commit 1729592
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 34 deletions.
6 changes: 4 additions & 2 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,13 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (parser.Value,
seriesIndex := map[uint64]*promql.Series{}

next, ts, vec := stepEvaluator.Next()

if GetRangeType(q.params) == InstantType {
sort.Slice(vec, func(i, j int) bool { return labels.Compare(vec[i].Metric, vec[j].Metric) < 0 })
return vec, nil
}

for next {

for _, p := range vec {
var (
series *promql.Series
Expand Down Expand Up @@ -227,7 +227,9 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (parser.Value,
}
result := promql.Matrix(series)
sort.Sort(result)
return result, nil

err = stepEvaluator.Error()
return result, err
}

func (q *query) evalLiteral(_ context.Context, expr *literalExpr) (parser.Value, error) {
Expand Down
142 changes: 114 additions & 28 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logql

import (
"context"
"errors"
"fmt"
"math"
"testing"
Expand All @@ -19,7 +20,11 @@ import (
"github.com/grafana/loki/pkg/logql/stats"
)

var testSize = int64(300)
var (
testSize = int64(300)
ErrMock = errors.New("mock error")
ErrMockMultiple = errors.New("Multiple errors: [mock error mock error]")
)

func TestEngine_InstantQuery(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -902,9 +907,9 @@ func TestEngine_RangeQuery(t *testing.T) {
},
{
`
rate({app=~"foo|bar"}[1m]) unless
rate({app="bar"}[1m])
`,
rate({app=~"foo|bar"}[1m]) unless
rate({app="bar"}[1m])
`,
time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Stream{
{
Expand All @@ -928,9 +933,9 @@ func TestEngine_RangeQuery(t *testing.T) {
},
{
`
rate({app=~"foo|bar"}[1m]) +
rate({app="bar"}[1m])
`,
rate({app=~"foo|bar"}[1m]) +
rate({app="bar"}[1m])
`,
time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Stream{
{
Expand All @@ -954,9 +959,9 @@ func TestEngine_RangeQuery(t *testing.T) {
},
{
`
rate({app=~"foo|bar"}[1m]) -
rate({app="bar"}[1m])
`,
rate({app=~"foo|bar"}[1m]) -
rate({app="bar"}[1m])
`,
time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Stream{
{
Expand All @@ -980,9 +985,9 @@ func TestEngine_RangeQuery(t *testing.T) {
},
{
`
count_over_time({app=~"foo|bar"}[1m]) *
count_over_time({app="bar"}[1m])
`,
count_over_time({app=~"foo|bar"}[1m]) *
count_over_time({app="bar"}[1m])
`,
time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Stream{
{
Expand All @@ -1006,9 +1011,9 @@ func TestEngine_RangeQuery(t *testing.T) {
},
{
`
count_over_time({app=~"foo|bar"}[1m]) *
count_over_time({app="bar"}[1m])
`,
count_over_time({app=~"foo|bar"}[1m]) *
count_over_time({app="bar"}[1m])
`,
time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Stream{
{
Expand All @@ -1032,9 +1037,9 @@ func TestEngine_RangeQuery(t *testing.T) {
},
{
`
count_over_time({app=~"foo|bar"}[1m]) /
count_over_time({app="bar"}[1m])
`,
count_over_time({app=~"foo|bar"}[1m]) /
count_over_time({app="bar"}[1m])
`,
time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Stream{
{
Expand All @@ -1058,9 +1063,9 @@ func TestEngine_RangeQuery(t *testing.T) {
},
{
`
count_over_time({app=~"foo|bar"}[1m]) %
count_over_time({app="bar"}[1m])
`,
count_over_time({app=~"foo|bar"}[1m]) %
count_over_time({app="bar"}[1m])
`,
time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Stream{
{
Expand All @@ -1085,10 +1090,10 @@ func TestEngine_RangeQuery(t *testing.T) {
// tests precedence: should be x + (x/x)
{
`
sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m])) +
sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m])) /
sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))
`,
sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m])) +
sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m])) /
sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))
`,
time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Stream{
{
Expand All @@ -1115,8 +1120,8 @@ func TestEngine_RangeQuery(t *testing.T) {
sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m])) +
sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m])) /
sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))
) * 2
`,
) * 2
`,
time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Stream{
{
Expand Down Expand Up @@ -1426,6 +1431,49 @@ func TestEngine_Stats(t *testing.T) {
require.Equal(t, int64(1), r.Statistics.Store.DecompressedBytes)
}

func TestStepEvaluator_Error(t *testing.T) {
tests := []struct {
name string
qs string
iters []iter.EntryIterator
err error
}{
{
"rangeAggEvaluator",
`count_over_time({app="foo"}[1m])`,
[]iter.EntryIterator{
iter.NewStreamIterator(newStream(testSize, identity, `{app="foo"}`)),
NewMockStreamIterator(newStream(testSize, identity, `{app="foo"}`)),
},
ErrMock,
},
{
"binOpStepEvaluator",
`count_over_time({app="foo"}[1m]) / count_over_time({app="foo"}[1m])`,
[]iter.EntryIterator{
iter.NewStreamIterator(newStream(testSize, identity, `{app="foo"}`)),
NewMockStreamIterator(newStream(testSize, identity, `{app="foo"}`)),
},
ErrMockMultiple,
},
}

for _, tc := range tests {
queryfunc := QuerierFunc(func(ctx context.Context, p SelectParams) (iter.EntryIterator, error) {
return iter.NewHeapIterator(ctx, tc.iters, p.Direction), nil
})
eng := NewEngine(EngineOpts{}, queryfunc)
q := eng.Query(LiteralParams{
qs: tc.qs,
start: time.Unix(0, 0),
end: time.Unix(180, 0),
step: 1 * time.Second,
})
_, err := q.Exec(context.Background())
require.Equal(t, tc.err, err)
}
}

// go test -mod=vendor ./pkg/logql/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out
func BenchmarkRangeQuery100000(b *testing.B) {
benchmarkRangeQuery(int64(100000), b)
Expand Down Expand Up @@ -1643,3 +1691,41 @@ func inverse(g generator) generator {
return g(-i)
}
}

// mockstreamIterator mocks error in iterator
type mockStreamIterator struct {
i int
entries []logproto.Entry
labels string
err error
}

// NewMockStreamIterator mocks error in iterator
func NewMockStreamIterator(stream logproto.Stream) iter.EntryIterator {
return &mockStreamIterator{
i: -1,
entries: stream.Entries,
labels: stream.Labels,
}
}

func (i *mockStreamIterator) Next() bool {
i.err = ErrMock
return false
}

func (i *mockStreamIterator) Error() error {
return i.err
}

func (i *mockStreamIterator) Labels() string {
return i.labels
}

func (i *mockStreamIterator) Entry() logproto.Entry {
return i.entries[i.i]
}

func (i *mockStreamIterator) Close() error {
return nil
}
22 changes: 20 additions & 2 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logql
import (
"container/heap"
"context"
"fmt"
"math"
"sort"
"time"
Expand Down Expand Up @@ -370,15 +371,14 @@ func vectorAggEvaluator(
}
return next, ts, vec

}, nextEvaluator.Close)
}, nextEvaluator.Close, nextEvaluator.Error)
}

func rangeAggEvaluator(
entryIter iter.EntryIterator,
expr *rangeAggregationExpr,
q Params,
) (StepEvaluator, error) {

agg, err := expr.aggregator()
if err != nil {
return nil, err
Expand Down Expand Up @@ -414,6 +414,8 @@ func (r rangeVectorEvaluator) Next() (bool, int64, promql.Vector) {

func (r rangeVectorEvaluator) Close() error { return r.iter.Close() }

func (r rangeVectorEvaluator) Error() error { return r.iter.Error() }

// binOpExpr explicitly does not handle when both legs are literals as
// it makes the type system simpler and these are reduced in mustNewBinOpExpr
func binOpStepEvaluator(
Expand Down Expand Up @@ -510,6 +512,21 @@ func binOpStepEvaluator(
}
}
return lastError
}, func() error {
var errs []error
for _, ev := range []StepEvaluator{lhs, rhs} {
if err := ev.Error(); err != nil {
errs = append(errs, err)
}
}
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return fmt.Errorf("Multiple errors: %+v", errs)
}
})
}

Expand Down Expand Up @@ -834,5 +851,6 @@ func literalStepEvaluator(
return ok, ts, results
},
eval.Close,
eval.Error,
)
}
2 changes: 2 additions & 0 deletions pkg/logql/matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,5 @@ func (m *MatrixStepper) Next() (bool, int64, promql.Vector) {
}

func (m *MatrixStepper) Close() error { return nil }

func (m *MatrixStepper) Error() error { return nil }
5 changes: 5 additions & 0 deletions pkg/logql/range_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type RangeVectorIterator interface {
Next() bool
At(aggregator RangeVectorAggregator) (int64, promql.Vector)
Close() error
Error() error
}

type rangeVectorIterator struct {
Expand Down Expand Up @@ -64,6 +65,10 @@ func (r *rangeVectorIterator) Close() error {
return r.iter.Close()
}

func (r *rangeVectorIterator) Error() error {
return r.iter.Error()
}

// popBack removes all entries out of the current window from the back.
func (r *rangeVectorIterator) popBack(newStart int64) {
// possible improvement: if there is no overlap we can just remove all.
Expand Down
5 changes: 5 additions & 0 deletions pkg/logql/series_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type SeriesIterator interface {
Close() error
Next() bool
Peek() (Sample, bool)
Error() error
}

// Sample is a series sample
Expand Down Expand Up @@ -72,6 +73,10 @@ func (e *seriesIterator) Peek() (Sample, bool) {
return e.cur, true
}

func (e *seriesIterator) Error() error {
return e.iter.Error()
}

// SampleExtractor transforms a log entry into a sample.
// In case of failure the second return value will be false.
type SampleExtractor interface {
Expand Down
18 changes: 17 additions & 1 deletion pkg/logql/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,22 @@ func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) {
}
return lastErr
},
func() error {
var errs []error
for _, eval := range evaluators {
if err := eval.Error(); err != nil {
errs = append(errs, err)
}
}
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return fmt.Errorf("Multiple errors: %+v", errs)
}
},
)
}

Expand All @@ -370,7 +386,7 @@ func ResultStepEvaluator(res Result, params Params) (StepEvaluator, error) {
return true, start.UnixNano() / int64(time.Millisecond), data
}
return false, 0, nil
}, nil)
}, nil, nil)
case promql.Matrix:
return NewMatrixStepper(start, end, step, data), nil
default:
Expand Down
Loading

0 comments on commit 1729592

Please sign in to comment.